1
/* Copyright (c) 2005 PrimeBase Technologies GmbH
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* 2005-09-30 Paul McCullagh
24
#include "xt_config.h"
37
/* This header not available on suse-11-amd64, ubuntu-9.10-amd64 */
41
#include <drizzled/base.h>
42
using namespace drizzled;
44
#include "mysql_priv.h"
47
#include "pthread_xt.h"
48
#include "memory_xt.h"
51
#include "database_xt.h"
52
#include "strutil_xt.h"
59
#define MAX_SEARCH_DEPTH 32
60
//#define CHECK_AND_PRINT
61
//#define CHECK_NODE_REFERENCE
62
//#define TRACE_FLUSH_INDEX
63
//#define CHECK_PRINTS_RECORD_REFERENCES
64
//#define DO_COMP_TEST
67
#define MAX_SEARCH_DEPTH 100
70
//#define TRACE_FLUSH_TIMES
72
typedef struct IdxStackItem {
74
xtIndexNodeID i_branch;
75
} IdxStackItemRec, *IdxStackItemPtr;
77
typedef struct IdxBranchStack {
79
IdxStackItemRec s_elements[MAX_SEARCH_DEPTH];
80
} IdxBranchStackRec, *IdxBranchStackPtr;
84
static void idx_check_on_key(XTOpenTablePtr ot);
86
static u_int idx_check_index(XTOpenTablePtr ot, XTIndexPtr ind, xtBool with_lock);
89
static xtBool idx_insert_node(XTOpenTablePtr ot, XTIndexPtr ind, IdxBranchStackPtr stack, xtBool last_item, XTIdxKeyValuePtr key_value, xtIndexNodeID branch);
90
static xtBool idx_remove_lazy_deleted_item_in_node(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID current, XTIndReferencePtr iref, XTIdxKeyValuePtr key_value);
92
#ifdef XT_TRACK_INDEX_UPDATES
94
static xtBool ind_track_write(struct XTOpenTable *ot, struct XTIndex *ind, xtIndexNodeID offset, size_t size, xtWord1 *data)
97
return xt_ind_write(ot, ind, offset, size, data);
100
#define XT_IND_WRITE ind_track_write
104
#define XT_IND_WRITE xt_ind_write
109
#ifdef CHECK_NODE_REFERENCE
110
#define IDX_GET_NODE_REF(t, x, o) idx_get_node_ref(t, x, o)
112
#define IDX_GET_NODE_REF(t, x, o) XT_GET_NODE_REF(t, (x) - (o))
116
* -----------------------------------------------------------------------
120
//#define TRACK_ACTIVITY
122
#ifdef TRACK_ACTIVITY
123
#define TRACK_MAX_BLOCKS 2000
125
typedef struct TrackBlock {
128
} TrackBlockRec, *TrackBlockPtr;
130
TrackBlockRec blocks[TRACK_MAX_BLOCKS];
132
xtPublic void track_work(u_int block, char *what)
136
ASSERT_NS(block > 0 && block <= TRACK_MAX_BLOCKS);
138
if (blocks[block].activity)
139
len = strlen(blocks[block].activity);
141
xt_realloc_ns((void **) &blocks[block].activity, len + len2 + 1);
142
memcpy(blocks[block].activity + len, what, len2 + 1);
145
static void track_block_exists(xtIndexNodeID block)
147
if (XT_NODE_ID(block) > 0 && XT_NODE_ID(block) <= TRACK_MAX_BLOCKS)
148
blocks[XT_NODE_ID(block)-1].exists = TRUE;
151
static void track_reset_missing()
153
for (u_int i=0; i<TRACK_MAX_BLOCKS; i++)
154
blocks[i].exists = FALSE;
157
static void track_dump_missing(xtIndexNodeID eof_block)
159
for (u_int i=0; i<XT_NODE_ID(eof_block)-1; i++) {
160
if (!blocks[i].exists)
161
printf("block missing = %04d %s\n", i+1, blocks[i].activity);
165
static void track_dump_all(u_int max_block)
167
for (u_int i=0; i<max_block; i++) {
168
if (blocks[i].exists)
169
printf(" %04d %s\n", i+1, blocks[i].activity);
171
printf("-%04d %s\n", i+1, blocks[i].activity);
177
xtPublic void xt_ind_track_dump_block(XTTableHPtr XT_UNUSED(tab), xtIndexNodeID XT_UNUSED(address))
179
#ifdef TRACK_ACTIVITY
180
u_int i = XT_NODE_ID(address)-1;
182
printf("BLOCK %04d %s\n", i+1, blocks[i].activity);
186
#ifdef CHECK_NODE_REFERENCE
187
static xtIndexNodeID idx_get_node_ref(XTTableHPtr tab, xtWord1 *ref, u_int node_ref_size)
191
/* Node is invalid by default: */
192
XT_NODE_ID(node) = 0xFFFFEEEE;
194
ref -= node_ref_size;
195
node = XT_RET_NODE_ID(XT_GET_DISK_4(ref));
196
if (node >= tab->tab_ind_eof) {
197
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, tab->tab_name);
205
* -----------------------------------------------------------------------
209
static void idx_newstack(IdxBranchStackPtr stack)
214
static xtBool idx_push(IdxBranchStackPtr stack, xtIndexNodeID n, XTIdxItemPtr pos)
216
if (stack->s_top == MAX_SEARCH_DEPTH) {
217
xt_register_error(XT_REG_CONTEXT, XT_ERR_STACK_OVERFLOW, 0, "Index node stack overflow");
220
stack->s_elements[stack->s_top].i_branch = n;
222
stack->s_elements[stack->s_top].i_pos = *pos;
227
static IdxStackItemPtr idx_pop(IdxBranchStackPtr stack)
229
if (stack->s_top == 0)
232
return &stack->s_elements[stack->s_top];
235
static IdxStackItemPtr idx_top(IdxBranchStackPtr stack)
237
if (stack->s_top == 0)
239
return &stack->s_elements[stack->s_top-1];
243
* -----------------------------------------------------------------------
244
* Allocation of nodes
248
* Allocating and freeing blocks for an index is safe because this is a structural
249
* change which requires an exclusive lock on the index!
251
static xtBool idx_new_branch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID *address)
253
register XTTableHPtr tab;
254
xtIndexNodeID wrote_pos;
255
XTIndFreeBlockRec free_block;
256
XTIndFreeListPtr list_ptr;
260
//ASSERT_NS(XT_INDEX_HAVE_XLOCK(ind, ot));
261
if (ind->mi_free_list && ind->mi_free_list->fl_free_count) {
262
ind->mi_free_list->fl_free_count--;
263
*address = ind->mi_free_list->fl_page_id[ind->mi_free_list->fl_free_count];
264
TRACK_BLOCK_ALLOC(*address);
268
xt_lock_mutex_ns(&tab->tab_ind_lock);
270
/* Check the cached free list: */
271
while ((list_ptr = tab->tab_ind_free_list)) {
272
if (list_ptr->fl_start < list_ptr->fl_free_count) {
273
wrote_pos = list_ptr->fl_page_id[list_ptr->fl_start];
274
list_ptr->fl_start++;
275
xt_unlock_mutex_ns(&tab->tab_ind_lock);
276
*address = wrote_pos;
277
TRACK_BLOCK_ALLOC(wrote_pos);
280
tab->tab_ind_free_list = list_ptr->fl_next_list;
281
xt_free_ns(list_ptr);
284
if ((XT_NODE_ID(wrote_pos) = XT_NODE_ID(tab->tab_ind_free))) {
285
/* Use the block on the free list: */
286
if (!xt_ind_read_bytes(ot, ind, wrote_pos, sizeof(XTIndFreeBlockRec), (xtWord1 *) &free_block))
288
XT_NODE_ID(tab->tab_ind_free) = (xtIndexNodeID) XT_GET_DISK_8(free_block.if_next_block_8);
289
xt_unlock_mutex_ns(&tab->tab_ind_lock);
290
*address = wrote_pos;
291
TRACK_BLOCK_ALLOC(wrote_pos);
295
/* PMC - Dont allow overflow! */
296
if (XT_NODE_ID(tab->tab_ind_eof) >= 0xFFFFFFF) {
297
xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_INDEX_FILE_TO_LARGE, xt_file_path(ot->ot_ind_file));
300
*address = tab->tab_ind_eof;
301
XT_NODE_ID(tab->tab_ind_eof)++;
302
xt_unlock_mutex_ns(&tab->tab_ind_lock);
303
TRACK_BLOCK_ALLOC(*address);
307
xt_unlock_mutex_ns(&tab->tab_ind_lock);
311
/* Add the block to the private free list of the index.
312
* On flush, this list will be transfered to the global list.
314
static xtBool idx_free_branch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID node_id)
316
register u_int count;
318
register u_int guess;
320
TRACK_BLOCK_FREE(node_id);
321
//ASSERT_NS(XT_INDEX_HAVE_XLOCK(ind, ot));
322
if (!ind->mi_free_list) {
324
if (!(ind->mi_free_list = (XTIndFreeListPtr) xt_calloc_ns(offsetof(XTIndFreeListRec, fl_page_id) + 10 * sizeof(xtIndexNodeID))))
328
count = ind->mi_free_list->fl_free_count;
329
if (!xt_realloc_ns((void **) &ind->mi_free_list, offsetof(XTIndFreeListRec, fl_page_id) + (count + 1) * sizeof(xtIndexNodeID)))
335
guess = (i + count - 1) >> 1;
336
if (XT_NODE_ID(node_id) == XT_NODE_ID(ind->mi_free_list->fl_page_id[guess])) {
337
// Should not happen...
341
if (XT_NODE_ID(node_id) < XT_NODE_ID(ind->mi_free_list->fl_page_id[guess]))
347
/* Insert at position i */
348
memmove(ind->mi_free_list->fl_page_id + i + 1, ind->mi_free_list->fl_page_id + i, (ind->mi_free_list->fl_free_count - i) * sizeof(xtIndexNodeID));
349
ind->mi_free_list->fl_page_id[i] = node_id;
350
ind->mi_free_list->fl_free_count++;
352
/* Set the cache page to clean: */
353
return xt_ind_free_block(ot, ind, node_id);
357
* -----------------------------------------------------------------------
358
* Simple compare functions
361
xtPublic int xt_compare_2_int4(XTIndexPtr XT_UNUSED(ind), uint key_length, xtWord1 *key_value, xtWord1 *b_value)
365
ASSERT_NS(key_length == 4 || key_length == 8);
366
r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
367
if (r == 0 && key_length > 4) {
370
r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
375
xtPublic int xt_compare_3_int4(XTIndexPtr XT_UNUSED(ind), uint key_length, xtWord1 *key_value, xtWord1 *b_value)
379
ASSERT_NS(key_length == 4 || key_length == 8 || key_length == 12);
380
r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
381
if (r == 0 && key_length > 4) {
384
r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
385
if (r == 0 && key_length > 8) {
388
r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
395
* -----------------------------------------------------------------------
396
* Tree branch sanning (searching nodes and leaves)
399
xtPublic void xt_scan_branch_single(struct XTTable *XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxKeyValuePtr value, register XTIdxResultRec *result)
404
u_int full_item_size;
406
register xtWord1 *base;
408
register xtWord1 *bitem;
411
branch_size = XT_GET_DISK_2(branch->tb_size_2);
412
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
414
result->sr_found = FALSE;
415
result->sr_duplicate = FALSE;
416
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
417
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
419
result->sr_item.i_item_size = ind->mi_key_size + XT_RECORD_REF_SIZE;
420
full_item_size = result->sr_item.i_item_size + node_ref_size;
421
result->sr_item.i_node_ref_size = node_ref_size;
423
search_flags = value->sv_flags;
424
base = branch->tb_data + node_ref_size;
425
total_count = (result->sr_item.i_total_size - node_ref_size) / full_item_size;
426
if (search_flags & XT_SEARCH_FIRST_FLAG)
428
else if (search_flags & XT_SEARCH_AFTER_LAST_FLAG)
431
register u_int guess;
432
register u_int count;
434
xtRecordID key_record;
436
key_record = value->sv_rec_id;
442
guess = (i + count - 1) >> 1;
444
bitem = base + guess * full_item_size;
446
switch (ind->mi_single_type) {
447
case HA_KEYTYPE_LONG_INT: {
448
register xtInt4 a, b;
450
a = XT_GET_DISK_4(value->sv_key);
451
b = XT_GET_DISK_4(bitem);
452
r = (a < b) ? -1 : (a == b ? 0 : 1);
455
case HA_KEYTYPE_ULONG_INT: {
456
register xtWord4 a, b;
458
a = XT_GET_DISK_4(value->sv_key);
459
b = XT_GET_DISK_4(bitem);
460
r = (a < b) ? -1 : (a == b ? 0 : 1);
464
/* Should not happen: */
469
if (search_flags & XT_SEARCH_WHOLE_KEY) {
470
xtRecordID item_record;
473
xt_get_record_ref(bitem + ind->mi_key_size, &item_record, &row_id);
475
/* This should not happen because we should never
476
* try to insert the same record twice into the
479
result->sr_duplicate = TRUE;
480
if (key_record == item_record) {
481
result->sr_found = TRUE;
482
result->sr_rec_id = item_record;
483
result->sr_row_id = row_id;
484
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);
485
result->sr_item.i_item_offset = node_ref_size + guess * full_item_size;
488
if (key_record < item_record)
494
result->sr_found = TRUE;
495
/* -1 causes a search to the beginning of the duplicate list of keys.
496
* 1 causes a search to just after the key.
498
if (search_flags & XT_SEARCH_AFTER_KEY)
512
bitem = base + i * full_item_size;
513
xt_get_res_record_ref(bitem + ind->mi_key_size, result);
514
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size); /* Only valid if this is a node. */
515
result->sr_item.i_item_offset = node_ref_size + i * full_item_size;
516
#ifdef IND_SKEW_SPLIT_ON_APPEND
517
if (i != total_count)
518
result->sr_last_item = FALSE;
523
* We use a special binary search here. It basically assumes that the values
524
* in the index are not unique.
526
* Even if they are unique, when we search for part of a key, then it is
527
* effectively the case.
529
* So in the situation where we find duplicates in the index we usually
530
* want to position ourselves at the beginning of the duplicate list.
532
* Alternatively a search can find the position just after a given key.
534
* To achieve this we make the following modifications:
535
* - The result of the comparison is always returns 1 or -1. We only stop
536
* the search early in the case an exact match when inserting (but this
537
* should not happen anyway).
538
* - The search never actually fails, but sets 'found' to TRUE if it
539
* sees the search key in the index.
541
* If the search value exists in the index we know that
542
* this method will take us to the first occurrence of the key in the
543
* index (in the case of -1) or to the first value after the
544
* the search key in the case of 1.
546
xtPublic void xt_scan_branch_fix(struct XTTable *XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxKeyValuePtr value, register XTIdxResultRec *result)
551
u_int full_item_size;
558
branch_size = XT_GET_DISK_2(branch->tb_size_2);
559
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
561
result->sr_found = FALSE;
562
result->sr_duplicate = FALSE;
563
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
564
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
566
result->sr_item.i_item_size = ind->mi_key_size + XT_RECORD_REF_SIZE;
567
full_item_size = result->sr_item.i_item_size + node_ref_size;
568
result->sr_item.i_node_ref_size = node_ref_size;
570
search_flags = value->sv_flags;
571
base = branch->tb_data + node_ref_size;
572
total_count = (result->sr_item.i_total_size - node_ref_size) / full_item_size;
573
if (search_flags & XT_SEARCH_FIRST_FLAG)
575
else if (search_flags & XT_SEARCH_AFTER_LAST_FLAG)
578
register u_int guess;
579
register u_int count;
580
xtRecordID key_record;
583
key_record = value->sv_rec_id;
589
guess = (i + count - 1) >> 1;
591
bitem = base + guess * full_item_size;
593
r = myxt_compare_key(ind, search_flags, value->sv_length, value->sv_key, bitem);
596
if (search_flags & XT_SEARCH_WHOLE_KEY) {
597
xtRecordID item_record;
600
xt_get_record_ref(bitem + ind->mi_key_size, &item_record, &row_id);
602
/* This should not happen because we should never
603
* try to insert the same record twice into the
606
result->sr_duplicate = TRUE;
607
if (key_record == item_record) {
608
result->sr_found = TRUE;
609
result->sr_rec_id = item_record;
610
result->sr_row_id = row_id;
611
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);
612
result->sr_item.i_item_offset = node_ref_size + guess * full_item_size;
615
if (key_record < item_record)
621
result->sr_found = TRUE;
622
/* -1 causes a search to the beginning of the duplicate list of keys.
623
* 1 causes a search to just after the key.
625
if (search_flags & XT_SEARCH_AFTER_KEY)
639
bitem = base + i * full_item_size;
640
xt_get_res_record_ref(bitem + ind->mi_key_size, result);
641
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size); /* Only valid if this is a node. */
642
result->sr_item.i_item_offset = node_ref_size + i * full_item_size;
643
#ifdef IND_SKEW_SPLIT_ON_APPEND
644
if (i != total_count)
645
result->sr_last_item = FALSE;
649
xtPublic void xt_scan_branch_fix_simple(struct XTTable *XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxKeyValuePtr value, register XTIdxResultRec *result)
654
u_int full_item_size;
661
branch_size = XT_GET_DISK_2(branch->tb_size_2);
662
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
664
result->sr_found = FALSE;
665
result->sr_duplicate = FALSE;
666
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
667
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
669
result->sr_item.i_item_size = ind->mi_key_size + XT_RECORD_REF_SIZE;
670
full_item_size = result->sr_item.i_item_size + node_ref_size;
671
result->sr_item.i_node_ref_size = node_ref_size;
673
search_flags = value->sv_flags;
674
base = branch->tb_data + node_ref_size;
675
total_count = (result->sr_item.i_total_size - node_ref_size) / full_item_size;
676
if (search_flags & XT_SEARCH_FIRST_FLAG)
678
else if (search_flags & XT_SEARCH_AFTER_LAST_FLAG)
681
register u_int guess;
682
register u_int count;
683
xtRecordID key_record;
686
key_record = value->sv_rec_id;
692
guess = (i + count - 1) >> 1;
694
bitem = base + guess * full_item_size;
696
r = ind->mi_simple_comp_key(ind, value->sv_length, value->sv_key, bitem);
699
if (search_flags & XT_SEARCH_WHOLE_KEY) {
700
xtRecordID item_record;
703
xt_get_record_ref(bitem + ind->mi_key_size, &item_record, &row_id);
705
/* This should not happen because we should never
706
* try to insert the same record twice into the
709
result->sr_duplicate = TRUE;
710
if (key_record == item_record) {
711
result->sr_found = TRUE;
712
result->sr_rec_id = item_record;
713
result->sr_row_id = row_id;
714
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);
715
result->sr_item.i_item_offset = node_ref_size + guess * full_item_size;
718
if (key_record < item_record)
724
result->sr_found = TRUE;
725
/* -1 causes a search to the beginning of the duplicate list of keys.
726
* 1 causes a search to just after the key.
728
if (search_flags & XT_SEARCH_AFTER_KEY)
742
bitem = base + i * full_item_size;
743
xt_get_res_record_ref(bitem + ind->mi_key_size, result);
744
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size); /* Only valid if this is a node. */
745
result->sr_item.i_item_offset = node_ref_size + i * full_item_size;
746
#ifdef IND_SKEW_SPLIT_ON_APPEND
747
if (i != total_count)
748
result->sr_last_item = FALSE;
753
* Variable length key values are stored as a sorted list. Since each list item has a variable length, we
754
* must scan the list sequentially in order to find a key.
756
xtPublic void xt_scan_branch_var(struct XTTable *XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxKeyValuePtr value, register XTIdxResultRec *result)
767
branch_size = XT_GET_DISK_2(branch->tb_size_2);
768
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
770
result->sr_found = FALSE;
771
result->sr_duplicate = FALSE;
772
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
773
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
775
result->sr_item.i_node_ref_size = node_ref_size;
777
search_flags = value->sv_flags;
778
base = branch->tb_data + node_ref_size;
780
bend = &branch->tb_data[result->sr_item.i_total_size];
785
if (search_flags & XT_SEARCH_FIRST_FLAG)
786
ilen = myxt_get_key_length(ind, bitem);
787
else if (search_flags & XT_SEARCH_AFTER_LAST_FLAG) {
792
xtRecordID key_record;
795
key_record = value->sv_rec_id;
798
while (bitem < bend) {
799
ilen = myxt_get_key_length(ind, bitem);
800
r = myxt_compare_key(ind, search_flags, value->sv_length, value->sv_key, bitem);
802
if (search_flags & XT_SEARCH_WHOLE_KEY) {
803
xtRecordID item_record;
806
xt_get_record_ref(bitem + ilen, &item_record, &row_id);
808
/* This should not happen because we should never
809
* try to insert the same record twice into the
812
result->sr_duplicate = TRUE;
813
if (key_record == item_record) {
814
result->sr_found = TRUE;
815
result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
816
result->sr_rec_id = item_record;
817
result->sr_row_id = row_id;
818
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);
819
result->sr_item.i_item_offset = bitem - branch->tb_data;
822
if (key_record < item_record)
828
result->sr_found = TRUE;
829
/* -1 causes a search to the beginning of the duplicate list of keys.
830
* 1 causes a search to just after the key.
832
if (search_flags & XT_SEARCH_AFTER_KEY)
840
bitem += ilen + XT_RECORD_REF_SIZE + node_ref_size;
845
result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
846
xt_get_res_record_ref(bitem + ilen, result);
847
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size); /* Only valid if this is a node. */
848
result->sr_item.i_item_offset = bitem - branch->tb_data;
849
#ifdef IND_SKEW_SPLIT_ON_APPEND
851
result->sr_last_item = FALSE;
855
/* Go to the next item in the node. */
856
static void idx_next_branch_item(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultRec *result)
862
result->sr_item.i_item_offset += result->sr_item.i_item_size + result->sr_item.i_node_ref_size;
863
bitem = branch->tb_data + result->sr_item.i_item_offset;
864
if (result->sr_item.i_item_offset < result->sr_item.i_total_size) {
866
ilen = result->sr_item.i_item_size;
868
ilen = myxt_get_key_length(ind, bitem) + XT_RECORD_REF_SIZE;
869
result->sr_item.i_item_size = ilen;
871
xt_get_res_record_ref(bitem + ilen - XT_RECORD_REF_SIZE, result); /* (Only valid if i_item_offset < i_total_size) */
874
result->sr_item.i_item_size = 0;
875
result->sr_rec_id = 0;
876
result->sr_row_id = 0;
878
if (result->sr_item.i_node_ref_size)
879
/* IDX_GET_NODE_REF() loads the branch reference to the LEFT of the item. */
880
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, result->sr_item.i_node_ref_size);
882
result->sr_branch = 0;
885
xtPublic void xt_prev_branch_item_fix(XTTableHPtr XT_UNUSED(tab), XTIndexPtr XT_UNUSED(ind), XTIdxBranchDPtr branch, register XTIdxResultRec *result)
888
ASSERT_NS(result->sr_item.i_item_offset >= result->sr_item.i_item_size + result->sr_item.i_node_ref_size + result->sr_item.i_node_ref_size);
889
result->sr_item.i_item_offset -= (result->sr_item.i_item_size + result->sr_item.i_node_ref_size);
890
xt_get_res_record_ref(branch->tb_data + result->sr_item.i_item_offset + result->sr_item.i_item_size - XT_RECORD_REF_SIZE, result); /* (Only valid if i_item_offset < i_total_size) */
891
result->sr_branch = IDX_GET_NODE_REF(tab, branch->tb_data + result->sr_item.i_item_offset, result->sr_item.i_node_ref_size);
894
xtPublic void xt_prev_branch_item_var(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultRec *result)
901
bitem = branch->tb_data + result->sr_item.i_node_ref_size;
902
bend = &branch->tb_data[result->sr_item.i_item_offset];
904
ilen = myxt_get_key_length(ind, bitem);
905
if (bitem + ilen + XT_RECORD_REF_SIZE + result->sr_item.i_node_ref_size >= bend)
907
bitem += ilen + XT_RECORD_REF_SIZE + result->sr_item.i_node_ref_size;
910
result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
911
xt_get_res_record_ref(bitem + ilen, result); /* (Only valid if i_item_offset < i_total_size) */
912
result->sr_branch = IDX_GET_NODE_REF(tab, bitem, result->sr_item.i_node_ref_size);
913
result->sr_item.i_item_offset = bitem - branch->tb_data;
916
static void idx_reload_item_fix(XTIndexPtr XT_NDEBUG_UNUSED(ind), XTIdxBranchDPtr branch, register XTIdxResultPtr result)
920
branch_size = XT_GET_DISK_2(branch->tb_size_2);
921
ASSERT_NS(result->sr_item.i_node_ref_size == (XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0));
922
ASSERT_NS(result->sr_item.i_item_size == ind->mi_key_size + XT_RECORD_REF_SIZE);
923
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
924
if (result->sr_item.i_item_offset > result->sr_item.i_total_size)
925
result->sr_item.i_item_offset = result->sr_item.i_total_size;
926
xt_get_res_record_ref(&branch->tb_data[result->sr_item.i_item_offset + result->sr_item.i_item_size - XT_RECORD_REF_SIZE], result);
929
static void idx_first_branch_item(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultPtr result)
936
branch_size = XT_GET_DISK_2(branch->tb_size_2);
937
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
939
result->sr_found = FALSE;
940
result->sr_duplicate = FALSE;
941
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
942
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
945
key_data_size = ind->mi_key_size;
949
bitem = branch->tb_data + node_ref_size;
950
if (bitem < &branch->tb_data[result->sr_item.i_total_size])
951
key_data_size = myxt_get_key_length(ind, bitem);
956
result->sr_item.i_item_size = key_data_size + XT_RECORD_REF_SIZE;
957
result->sr_item.i_node_ref_size = node_ref_size;
959
xt_get_res_record_ref(branch->tb_data + node_ref_size + key_data_size, result);
960
result->sr_branch = IDX_GET_NODE_REF(tab, branch->tb_data + node_ref_size, node_ref_size); /* Only valid if this is a node. */
961
result->sr_item.i_item_offset = node_ref_size;
965
* Last means different things for leaf or node!
967
xtPublic void xt_last_branch_item_fix(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultPtr result)
973
branch_size = XT_GET_DISK_2(branch->tb_size_2);
974
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
976
result->sr_found = FALSE;
977
result->sr_duplicate = FALSE;
978
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
979
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
981
result->sr_item.i_item_size = ind->mi_key_size + XT_RECORD_REF_SIZE;
982
result->sr_item.i_node_ref_size = node_ref_size;
985
result->sr_item.i_item_offset = result->sr_item.i_total_size;
986
result->sr_branch = IDX_GET_NODE_REF(tab, branch->tb_data + result->sr_item.i_item_offset, node_ref_size);
989
if (result->sr_item.i_total_size) {
990
result->sr_item.i_item_offset = result->sr_item.i_total_size - result->sr_item.i_item_size;
991
xt_get_res_record_ref(branch->tb_data + result->sr_item.i_item_offset + ind->mi_key_size, result);
995
result->sr_item.i_item_offset = 0;
999
xtPublic void xt_last_branch_item_var(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultPtr result)
1003
u_int node_ref_size;
1005
branch_size = XT_GET_DISK_2(branch->tb_size_2);
1006
node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
1008
result->sr_found = FALSE;
1009
result->sr_duplicate = FALSE;
1010
result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
1011
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
1013
result->sr_item.i_node_ref_size = node_ref_size;
1015
if (node_ref_size) {
1016
result->sr_item.i_item_offset = result->sr_item.i_total_size;
1017
result->sr_branch = IDX_GET_NODE_REF(tab, branch->tb_data + result->sr_item.i_item_offset, node_ref_size);
1018
result->sr_item.i_item_size = 0;
1021
if (result->sr_item.i_total_size) {
1026
bitem = branch->tb_data + node_ref_size;;
1027
bend = &branch->tb_data[result->sr_item.i_total_size];
1031
ilen = myxt_get_key_length(ind, bitem);
1032
if (bitem + ilen + XT_RECORD_REF_SIZE + node_ref_size >= bend)
1034
bitem += ilen + XT_RECORD_REF_SIZE + node_ref_size;
1038
result->sr_item.i_item_offset = bitem - branch->tb_data;
1039
xt_get_res_record_ref(bitem + ilen, result);
1040
result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
1043
/* Leaf is empty: */
1044
result->sr_item.i_item_offset = 0;
1045
result->sr_item.i_item_size = 0;
1050
xtPublic xtBool xt_idx_lazy_delete_on_leaf(XTIndexPtr ind, XTIndBlockPtr block, xtWord2 branch_size)
1052
ASSERT_NS(ind->mi_fix_key);
1054
/* Compact the leaf if more than half the items that fit on the page
1056
if (block->cp_del_count >= ind->mi_max_items/2)
1059
/* Compact the page if there is only 1 (or less) valid item left: */
1060
if ((u_int) block->cp_del_count+1 >= ((u_int) branch_size - 2)/(ind->mi_key_size + XT_RECORD_REF_SIZE))
1066
static xtBool idx_lazy_delete_on_node(XTIndexPtr ind, XTIndBlockPtr block, register XTIdxItemPtr item)
1068
ASSERT_NS(ind->mi_fix_key);
1070
/* Compact the node if more than 1/4 of the items that fit on the page
1072
if (block->cp_del_count >= ind->mi_max_items/4)
1075
/* Compact the page if there is only 1 (or less) valid item left: */
1076
if ((u_int) block->cp_del_count+1 >= (item->i_total_size - item->i_node_ref_size)/(item->i_item_size + item->i_node_ref_size))
1082
inline static xtBool idx_cmp_item_key_fix(XTIndReferencePtr iref, register XTIdxItemPtr item, XTIdxKeyValuePtr value)
1086
data = &iref->ir_branch->tb_data[item->i_item_offset];
1087
return memcmp(data, value->sv_key, value->sv_length) == 0;
1090
inline static void idx_set_item_key_fix(XTIndReferencePtr iref, register XTIdxItemPtr item, XTIdxKeyValuePtr value)
1094
data = &iref->ir_branch->tb_data[item->i_item_offset];
1095
memcpy(data, value->sv_key, value->sv_length);
1096
xt_set_val_record_ref(data + value->sv_length, value);
1097
#ifdef IND_OPT_DATA_WRITTEN
1098
if (item->i_item_offset < iref->ir_block->cb_min_pos)
1099
iref->ir_block->cb_min_pos = item->i_item_offset;
1100
if (item->i_item_offset + value->sv_length > iref->ir_block->cb_max_pos)
1101
iref->ir_block->cb_max_pos = item->i_item_offset + value->sv_length;
1102
ASSERT_NS(iref->ir_block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1103
ASSERT_NS(iref->ir_block->cb_min_pos <= iref->ir_block->cb_max_pos);
1105
iref->ir_updated = TRUE;
1108
static xtBool idx_set_item_row_id(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item, xtRowID row_id)
1110
register XTIndBlockPtr block = iref->ir_block;
1114
if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1115
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1116
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1117
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1123
/* This is the offset of the reference in the item we found: */
1124
item->i_item_offset +item->i_item_size - XT_RECORD_REF_SIZE +
1125
/* This is the offset of the row id in the reference: */
1127
data = &iref->ir_branch->tb_data[offset];
1129
/* This update does not change the structure of page, so we do it without
1130
* copying the page before we write.
1132
XT_SET_DISK_4(data, row_id);
1133
#ifdef IND_OPT_DATA_WRITTEN
1134
if (offset < block->cb_min_pos)
1135
block->cb_min_pos = offset;
1136
if (offset + XT_ROW_ID_SIZE > block->cb_max_pos)
1137
block->cb_max_pos = offset + XT_ROW_ID_SIZE;
1138
ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1139
ASSERT_NS(block->cb_min_pos <= iref->ir_block->cb_max_pos);
1141
iref->ir_updated = TRUE;
1145
inline static xtBool idx_is_item_deleted(register XTIdxBranchDPtr branch, register XTIdxItemPtr item)
1149
data = &branch->tb_data[item->i_item_offset + item->i_item_size - XT_RECORD_REF_SIZE + XT_RECORD_ID_SIZE];
1150
return XT_GET_DISK_4(data) == (xtRowID) -1;
1153
static xtBool idx_set_item_deleted(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item)
1155
if (!idx_set_item_row_id(ot, ind, iref, item, (xtRowID) -1))
1158
/* This should be safe because there is only one thread,
1159
* the sweeper, that does this!
1161
* Threads that decrement this value have an xlock on
1162
* the page, or the index.
1164
iref->ir_block->cp_del_count++;
1169
* {LAZY-DEL-INDEX-ITEMS}
1170
* Do a lazy delete of an item by just setting the Row ID
1171
* to the delete indicator: row ID -1.
1173
static xtBool idx_lazy_delete_branch_item(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item)
1175
if (!idx_set_item_deleted(ot, ind, iref, item))
1177
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_W_UPDATE : XT_UNLOCK_R_UPDATE, iref);
1182
* This function compacts the leaf, but preserves the
1183
* position of the item.
1185
static xtBool idx_compact_leaf(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item)
1187
register XTIndBlockPtr block = iref->ir_block;
1188
register XTIdxBranchDPtr branch = iref->ir_branch;
1189
int item_idx, count, i, idx;
1196
if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1197
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1198
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1199
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1204
if (block->cb_handle_count) {
1205
if (!xt_ind_copy_on_write(iref)) {
1206
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1211
ASSERT_NS(!item->i_node_ref_size);
1212
ASSERT_NS(ind->mi_fix_key);
1213
size = item->i_item_size;
1214
count = item->i_total_size / size;
1215
item_idx = item->i_item_offset / size;
1216
s_data = d_data = branch->tb_data;
1218
for (i=0; i<count; i++) {
1219
data = s_data + item->i_item_size - XT_RECORD_REF_SIZE + XT_RECORD_ID_SIZE;
1220
row_id = XT_GET_DISK_4(data);
1221
if (row_id == (xtRowID) -1) {
1226
if (d_data != s_data)
1227
memcpy(d_data, s_data, size);
1233
block->cp_del_count = 0;
1234
item->i_total_size = d_data - branch->tb_data;
1235
ASSERT_NS(idx * size == item->i_total_size);
1236
item->i_item_offset = item_idx * size;
1237
XT_SET_DISK_2(branch->tb_size_2, XT_MAKE_BRANCH_SIZE(item->i_total_size, 0));
1238
#ifdef IND_OPT_DATA_WRITTEN
1239
block->cb_header = TRUE;
1240
block->cb_min_pos = 0;
1241
block->cb_max_pos = item->i_total_size;
1242
ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1243
ASSERT_NS(block->cb_min_pos <= iref->ir_block->cb_max_pos);
1245
iref->ir_updated = TRUE;
1249
static xtBool idx_lazy_remove_leaf_item_right(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item)
1251
register XTIndBlockPtr block = iref->ir_block;
1252
register XTIdxBranchDPtr branch = iref->ir_branch;
1253
int item_idx, count, i;
1260
ASSERT_NS(!item->i_node_ref_size);
1262
if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1263
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1264
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1265
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1270
if (block->cb_handle_count) {
1271
if (!xt_ind_copy_on_write(iref)) {
1272
xt_ind_release(ot, ind, XT_UNLOCK_WRITE, iref);
1277
ASSERT_NS(ind->mi_fix_key);
1278
size = item->i_item_size;
1279
count = item->i_total_size / size;
1280
item_idx = item->i_item_offset / size;
1281
s_data = d_data = branch->tb_data;
1282
for (i=0; i<count; i++) {
1284
item->i_item_offset = d_data - branch->tb_data;
1286
data = s_data + item->i_item_size - XT_RECORD_REF_SIZE + XT_RECORD_ID_SIZE;
1287
row_id = XT_GET_DISK_4(data);
1288
if (row_id != (xtRowID) -1) {
1289
if (d_data != s_data)
1290
memcpy(d_data, s_data, size);
1296
block->cp_del_count = 0;
1297
item->i_total_size = d_data - branch->tb_data;
1298
XT_SET_DISK_2(branch->tb_size_2, XT_MAKE_BRANCH_SIZE(item->i_total_size, 0));
1299
#ifdef IND_OPT_DATA_WRITTEN
1300
block->cb_header = TRUE;
1301
block->cb_min_pos = 0;
1302
block->cb_max_pos = item->i_total_size;
1303
ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1304
ASSERT_NS(block->cb_min_pos <= iref->ir_block->cb_max_pos);
1306
iref->ir_updated = TRUE;
1307
xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, iref);
1312
* Remove an item and save to disk.
1314
static xtBool idx_remove_branch_item_right(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID, XTIndReferencePtr iref, register XTIdxItemPtr item)
1316
register XTIndBlockPtr block = iref->ir_block;
1317
register XTIdxBranchDPtr branch = iref->ir_branch;
1318
u_int size = item->i_item_size + item->i_node_ref_size;
1320
if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1321
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1322
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1323
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1328
/* {HANDLE-COUNT-USAGE}
1329
* This access is safe because we have the right to update
1330
* the page, so no other thread can modify the page.
1333
* We either have an Xlock on the index, or we have
1334
* an Xlock on the cache block.
1336
if (block->cb_handle_count) {
1337
if (!xt_ind_copy_on_write(iref)) {
1338
xt_ind_release(ot, ind, item->i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, iref);
1342
if (ind->mi_lazy_delete) {
1343
if (idx_is_item_deleted(branch, item))
1344
block->cp_del_count--;
1346
/* Remove the node reference to the left of the item: */
1347
memmove(&branch->tb_data[item->i_item_offset],
1348
&branch->tb_data[item->i_item_offset + size],
1349
item->i_total_size - item->i_item_offset - size);
1350
item->i_total_size -= size;
1351
XT_SET_DISK_2(branch->tb_size_2, XT_MAKE_BRANCH_SIZE(item->i_total_size, item->i_node_ref_size));
1352
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(branch->tb_size_2));
1353
#ifdef IND_OPT_DATA_WRITTEN
1354
block->cb_header = TRUE;
1355
if (item->i_item_offset < block->cb_min_pos)
1356
block->cb_min_pos = item->i_item_offset;
1357
block->cb_max_pos = item->i_total_size;
1358
ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1359
ASSERT_NS(block->cb_min_pos <= block->cb_max_pos);
1361
iref->ir_updated = TRUE;
1362
xt_ind_release(ot, ind, item->i_node_ref_size ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_W_UPDATE, iref);
1366
static xtBool idx_remove_branch_item_left(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID, XTIndReferencePtr iref, register XTIdxItemPtr item, xtBool *lazy_delete_cleanup_required)
1368
register XTIndBlockPtr block = iref->ir_block;
1369
register XTIdxBranchDPtr branch = iref->ir_branch;
1370
u_int size = item->i_item_size + item->i_node_ref_size;
1372
if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1373
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1374
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1375
xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1380
ASSERT_NS(item->i_node_ref_size);
1381
if (block->cb_handle_count) {
1382
if (!xt_ind_copy_on_write(iref)) {
1383
xt_ind_release(ot, ind, item->i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, iref);
1387
if (ind->mi_lazy_delete) {
1388
if (idx_is_item_deleted(branch, item))
1389
block->cp_del_count--;
1390
if (lazy_delete_cleanup_required)
1391
*lazy_delete_cleanup_required = idx_lazy_delete_on_node(ind, block, item);
1393
/* Remove the node reference to the left of the item: */
1394
memmove(&branch->tb_data[item->i_item_offset - item->i_node_ref_size],
1395
&branch->tb_data[item->i_item_offset + item->i_item_size],
1396
item->i_total_size - item->i_item_offset - item->i_item_size);
1397
item->i_total_size -= size;
1398
XT_SET_DISK_2(branch->tb_size_2, XT_MAKE_BRANCH_SIZE(item->i_total_size, item->i_node_ref_size));
1399
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(branch->tb_size_2));
1400
#ifdef IND_OPT_DATA_WRITTEN
1401
block->cb_header = TRUE;
1402
if (item->i_item_offset - item->i_node_ref_size < block->cb_min_pos)
1403
block->cb_min_pos = item->i_item_offset - item->i_node_ref_size;
1404
block->cb_max_pos = item->i_total_size;
1405
ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1406
ASSERT_NS(block->cb_min_pos <= block->cb_max_pos);
1408
iref->ir_updated = TRUE;
1409
xt_ind_release(ot, ind, item->i_node_ref_size ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_W_UPDATE, iref);
1413
static void idx_insert_leaf_item(XTIndexPtr XT_UNUSED(ind), XTIdxBranchDPtr leaf, XTIdxKeyValuePtr value, XTIdxResultPtr result)
1417
/* This will ensure we do not overwrite the end of the buffer: */
1418
ASSERT_NS(value->sv_length <= XT_INDEX_MAX_KEY_SIZE);
1419
memmove(&leaf->tb_data[result->sr_item.i_item_offset + value->sv_length + XT_RECORD_REF_SIZE],
1420
&leaf->tb_data[result->sr_item.i_item_offset],
1421
result->sr_item.i_total_size - result->sr_item.i_item_offset);
1422
item = &leaf->tb_data[result->sr_item.i_item_offset];
1423
memcpy(item, value->sv_key, value->sv_length);
1424
xt_set_val_record_ref(item + value->sv_length, value);
1425
result->sr_item.i_total_size += value->sv_length + XT_RECORD_REF_SIZE;
1426
XT_SET_DISK_2(leaf->tb_size_2, XT_MAKE_LEAF_SIZE(result->sr_item.i_total_size));
1429
static void idx_insert_node_item(XTTableHPtr XT_UNUSED(tab), XTIndexPtr XT_UNUSED(ind), XTIdxBranchDPtr leaf, XTIdxKeyValuePtr value, XTIdxResultPtr result, xtIndexNodeID branch)
1433
/* This will ensure we do not overwrite the end of the buffer: */
1434
ASSERT_NS(value->sv_length <= XT_INDEX_MAX_KEY_SIZE);
1435
memmove(&leaf->tb_data[result->sr_item.i_item_offset + value->sv_length + XT_RECORD_REF_SIZE + result->sr_item.i_node_ref_size],
1436
&leaf->tb_data[result->sr_item.i_item_offset],
1437
result->sr_item.i_total_size - result->sr_item.i_item_offset);
1438
item = &leaf->tb_data[result->sr_item.i_item_offset];
1439
memcpy(item, value->sv_key, value->sv_length);
1440
xt_set_val_record_ref(item + value->sv_length, value);
1441
XT_SET_NODE_REF(tab, item + value->sv_length + XT_RECORD_REF_SIZE, branch);
1442
result->sr_item.i_total_size += value->sv_length + XT_RECORD_REF_SIZE + result->sr_item.i_node_ref_size;
1443
XT_SET_DISK_2(leaf->tb_size_2, XT_MAKE_NODE_SIZE(result->sr_item.i_total_size));
1446
static xtBool idx_get_middle_branch_item(XTOpenTablePtr ot, XTIndexPtr ind, XTIdxBranchDPtr branch, XTIdxKeyValuePtr value, XTIdxResultPtr result)
1450
ASSERT_NS(result->sr_item.i_node_ref_size == 0 || result->sr_item.i_node_ref_size == XT_NODE_REF_SIZE);
1451
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE*2);
1452
if (ind->mi_fix_key) {
1453
u_int full_item_size = result->sr_item.i_item_size + result->sr_item.i_node_ref_size;
1455
result->sr_item.i_item_offset = ((result->sr_item.i_total_size - result->sr_item.i_node_ref_size)
1456
/ full_item_size / 2 * full_item_size) + result->sr_item.i_node_ref_size;
1457
#ifdef IND_SKEW_SPLIT_ON_APPEND
1458
if (result->sr_last_item) {
1461
offset = result->sr_item.i_total_size - full_item_size * 2;
1462
/* We actually split at the item before last! */
1463
if (offset > result->sr_item.i_item_offset)
1464
result->sr_item.i_item_offset = offset;
1468
bitem = &branch->tb_data[result->sr_item.i_item_offset];
1469
value->sv_flags = XT_SEARCH_WHOLE_KEY;
1470
value->sv_length = result->sr_item.i_item_size - XT_RECORD_REF_SIZE;
1471
xt_get_record_ref(bitem + value->sv_length, &value->sv_rec_id, &value->sv_row_id);
1472
memcpy(value->sv_key, bitem, value->sv_length);
1475
u_int node_ref_size;
1479
node_ref_size = result->sr_item.i_node_ref_size;
1480
bitem = branch->tb_data + node_ref_size;
1481
bend = &branch->tb_data[(result->sr_item.i_total_size - node_ref_size) / 2 + node_ref_size];
1482
#ifdef IND_SKEW_SPLIT_ON_APPEND
1483
if (result->sr_last_item)
1484
bend = &branch->tb_data[XT_INDEX_PAGE_DATA_SIZE];
1486
u_int prev_ilen = 0;
1487
xtWord1 *prev_bitem = NULL;
1493
ilen = myxt_get_key_length(ind, bitem);
1494
tlen += ilen + XT_RECORD_REF_SIZE + node_ref_size;
1495
if (bitem + ilen + XT_RECORD_REF_SIZE + node_ref_size >= bend) {
1496
if (ilen > XT_INDEX_PAGE_SIZE || tlen > result->sr_item.i_total_size) {
1497
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
1502
#ifdef IND_SKEW_SPLIT_ON_APPEND
1506
bitem += ilen + XT_RECORD_REF_SIZE + node_ref_size;
1510
#ifdef IND_SKEW_SPLIT_ON_APPEND
1511
/* We actully want the item before last! */
1512
if (result->sr_last_item && prev_bitem) {
1517
result->sr_item.i_item_offset = bitem - branch->tb_data;
1518
result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
1520
value->sv_flags = XT_SEARCH_WHOLE_KEY;
1521
value->sv_length = ilen;
1522
xt_get_record_ref(bitem + ilen, &value->sv_rec_id, &value->sv_row_id);
1523
memcpy(value->sv_key, bitem, value->sv_length);
1528
static size_t idx_write_branch_item(XTIndexPtr XT_UNUSED(ind), xtWord1 *item, XTIdxKeyValuePtr value)
1530
memcpy(item, value->sv_key, value->sv_length);
1531
xt_set_val_record_ref(item + value->sv_length, value);
1532
return value->sv_length + XT_RECORD_REF_SIZE;
1535
static xtBool idx_replace_node_key(XTOpenTablePtr ot, XTIndexPtr ind, IdxStackItemPtr item, IdxBranchStackPtr stack, u_int item_size, xtWord1 *item_buf)
1537
XTIndReferenceRec iref;
1538
xtIndexNodeID new_branch;
1539
XTIdxResultRec result;
1540
xtIndexNodeID current = item->i_branch;
1542
XTIdxBranchDPtr new_branch_ptr;
1543
XTIdxKeyValueRec key_value;
1544
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE];
1548
iref.ir_updated = 2;
1550
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_WRITE, &iref))
1553
if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1554
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1555
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
1559
if (iref.ir_block->cb_handle_count) {
1560
if (!xt_ind_copy_on_write(&iref))
1564
if (ind->mi_lazy_delete) {
1565
ASSERT_NS(item_size == item->i_pos.i_item_size);
1566
if (idx_is_item_deleted(iref.ir_branch, &item->i_pos))
1567
iref.ir_block->cp_del_count--;
1569
memmove(&iref.ir_branch->tb_data[item->i_pos.i_item_offset + item_size],
1570
&iref.ir_branch->tb_data[item->i_pos.i_item_offset + item->i_pos.i_item_size],
1571
item->i_pos.i_total_size - item->i_pos.i_item_offset - item->i_pos.i_item_size);
1572
memcpy(&iref.ir_branch->tb_data[item->i_pos.i_item_offset],
1573
item_buf, item_size);
1574
if (ind->mi_lazy_delete) {
1575
if (idx_is_item_deleted(iref.ir_branch, &item->i_pos))
1576
iref.ir_block->cp_del_count++;
1578
item->i_pos.i_total_size = item->i_pos.i_total_size + item_size - item->i_pos.i_item_size;
1579
XT_SET_DISK_2(iref.ir_branch->tb_size_2, XT_MAKE_NODE_SIZE(item->i_pos.i_total_size));
1580
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(iref.ir_branch->tb_size_2));
1581
#ifdef IND_OPT_DATA_WRITTEN
1582
iref.ir_block->cb_header = TRUE;
1583
if (item->i_pos.i_item_offset < iref.ir_block->cb_min_pos)
1584
iref.ir_block->cb_min_pos = item->i_pos.i_item_offset;
1585
iref.ir_block->cb_max_pos = item->i_pos.i_total_size;
1586
ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
1588
iref.ir_updated = TRUE;
1591
if (ind->mi_lazy_delete)
1592
ASSERT_NS(item->i_pos.i_total_size <= XT_INDEX_PAGE_DATA_SIZE);
1594
if (item->i_pos.i_total_size <= XT_INDEX_PAGE_DATA_SIZE)
1595
return xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
1597
/* The node has overflowed!! */
1598
#ifdef IND_SKEW_SPLIT_ON_APPEND
1599
result.sr_last_item = FALSE;
1601
result.sr_item = item->i_pos;
1603
/* Adjust the stack (we want the parents of the delete node): */
1605
if (idx_pop(stack) == item)
1609
/* We assume that value can be overwritten (which is the case) */
1610
key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
1611
key_value.sv_key = key_buf;
1612
if (!idx_get_middle_branch_item(ot, ind, iref.ir_branch, &key_value, &result))
1615
if (!idx_new_branch(ot, ind, &new_branch))
1618
/* Split the node: */
1619
new_size = result.sr_item.i_total_size - result.sr_item.i_item_offset - result.sr_item.i_item_size;
1620
// TODO: Are 2 buffers now required?
1621
new_branch_ptr = (XTIdxBranchDPtr) &ot->ot_ind_wbuf.tb_data[XT_INDEX_PAGE_DATA_SIZE];
1622
memmove(new_branch_ptr->tb_data, &iref.ir_branch->tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size], new_size);
1624
XT_SET_DISK_2(new_branch_ptr->tb_size_2, XT_MAKE_NODE_SIZE(new_size));
1625
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(new_branch), (int) XT_GET_DISK_2(new_branch_ptr->tb_size_2));
1626
if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + new_size, (xtWord1 *) new_branch_ptr))
1629
/* Change the size of the old branch: */
1630
XT_SET_DISK_2(iref.ir_branch->tb_size_2, XT_MAKE_NODE_SIZE(result.sr_item.i_item_offset));
1631
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(iref.ir_branch->tb_size_2));
1632
#ifdef IND_OPT_DATA_WRITTEN
1633
iref.ir_block->cb_header = TRUE;
1634
if (result.sr_item.i_item_offset < iref.ir_block->cb_min_pos)
1635
iref.ir_block->cb_min_pos = result.sr_item.i_item_offset;
1636
iref.ir_block->cb_max_pos = result.sr_item.i_item_offset;
1637
ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
1638
ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
1640
iref.ir_updated = TRUE;
1641
xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
1643
/* Insert the new branch into the parent node, using the new middle key value: */
1644
if (!idx_insert_node(ot, ind, stack, FALSE, &key_value, new_branch)) {
1646
* TODO: Mark the index as corrupt.
1647
* This should not fail because everything has been
1649
* However, if it does fail the index
1651
* I could modify and release the branch above,
1653
* But that would mean holding the lock longer,
1654
* and also may not help because idx_insert_node()
1657
idx_free_branch(ot, ind, new_branch);
1664
idx_free_branch(ot, ind, new_branch);
1667
xt_ind_release(ot, ind, XT_UNLOCK_WRITE, &iref);
1673
* -----------------------------------------------------------------------
1674
* Standard b-tree insert
1678
* Insert the given branch into the node on the top of the stack. If the stack
1679
* is empty we need to add a new root.
1681
static xtBool idx_insert_node(XTOpenTablePtr ot, XTIndexPtr ind, IdxBranchStackPtr stack, xtBool last_item, XTIdxKeyValuePtr key_value, xtIndexNodeID branch)
1683
IdxStackItemPtr stack_item;
1684
xtIndexNodeID new_branch;
1686
xtIndexNodeID current;
1687
XTIndReferenceRec iref;
1688
XTIdxResultRec result;
1690
XTIdxBranchDPtr new_branch_ptr;
1691
#ifdef IND_OPT_DATA_WRITTEN
1697
iref.ir_updated = 2;
1699
/* Insert a new branch (key, data)... */
1700
if (!(stack_item = idx_pop(stack))) {
1704
if (!idx_new_branch(ot, ind, &new_branch))
1707
ditem = ot->ot_ind_wbuf.tb_data;
1708
XT_SET_NODE_REF(ot->ot_table, ditem, ind->mi_root);
1709
ditem += XT_NODE_REF_SIZE;
1710
ditem += idx_write_branch_item(ind, ditem, key_value);
1711
XT_SET_NODE_REF(ot->ot_table, ditem, branch);
1712
ditem += XT_NODE_REF_SIZE;
1713
size = ditem - ot->ot_ind_wbuf.tb_data;
1714
XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_NODE_SIZE(size));
1715
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(new_branch), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1716
if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + size, (xtWord1 *) &ot->ot_ind_wbuf))
1718
ind->mi_root = new_branch;
1722
current = stack_item->i_branch;
1723
/* This read does not count (towards ot_ind_reads), because we are only
1724
* counting each loaded page once. We assume that the page is in
1725
* cache, and will remain in cache when we read again below for the
1726
* purpose of update.
1728
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
1730
ASSERT_NS(XT_IS_NODE(XT_GET_DISK_2(iref.ir_branch->tb_size_2)));
1731
#ifdef IND_SKEW_SPLIT_ON_APPEND
1732
result.sr_last_item = last_item;
1734
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, key_value, &result);
1736
if (result.sr_item.i_total_size + key_value->sv_length + XT_RECORD_REF_SIZE + result.sr_item.i_node_ref_size <= XT_INDEX_PAGE_DATA_SIZE) {
1737
if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1738
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1739
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
1743
if (iref.ir_block->cb_handle_count) {
1744
if (!xt_ind_copy_on_write(&iref))
1748
idx_insert_node_item(ot->ot_table, ind, iref.ir_branch, key_value, &result, branch);
1749
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1750
#ifdef IND_OPT_DATA_WRITTEN
1751
iref.ir_block->cb_header = TRUE;
1752
if (result.sr_item.i_item_offset < iref.ir_block->cb_min_pos)
1753
iref.ir_block->cb_min_pos = result.sr_item.i_item_offset;
1754
iref.ir_block->cb_max_pos = result.sr_item.i_total_size;
1755
ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
1756
ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
1758
iref.ir_updated = TRUE;
1759
ASSERT_NS(result.sr_item.i_total_size <= XT_INDEX_PAGE_DATA_SIZE);
1760
xt_ind_release(ot, ind, XT_UNLOCK_R_UPDATE, &iref);
1764
memcpy(&ot->ot_ind_wbuf, iref.ir_branch, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_total_size);
1765
idx_insert_node_item(ot->ot_table, ind, &ot->ot_ind_wbuf, key_value, &result, branch);
1766
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1767
ASSERT_NS(result.sr_item.i_total_size > XT_INDEX_PAGE_DATA_SIZE);
1768
#ifdef IND_OPT_DATA_WRITTEN
1769
new_min_pos = result.sr_item.i_item_offset;
1772
/* We assume that value can be overwritten (which is the case) */
1773
if (!idx_get_middle_branch_item(ot, ind, &ot->ot_ind_wbuf, key_value, &result))
1776
if (!idx_new_branch(ot, ind, &new_branch))
1779
/* Split the node: */
1780
new_size = result.sr_item.i_total_size - result.sr_item.i_item_offset - result.sr_item.i_item_size;
1781
new_branch_ptr = (XTIdxBranchDPtr) &ot->ot_ind_wbuf.tb_data[XT_INDEX_PAGE_DATA_SIZE];
1782
memmove(new_branch_ptr->tb_data, &ot->ot_ind_wbuf.tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size], new_size);
1784
XT_SET_DISK_2(new_branch_ptr->tb_size_2, XT_MAKE_NODE_SIZE(new_size));
1785
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(new_branch), (int) XT_GET_DISK_2(new_branch_ptr->tb_size_2));
1786
if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + new_size, (xtWord1 *) new_branch_ptr))
1789
/* Change the size of the old branch: */
1790
XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_NODE_SIZE(result.sr_item.i_item_offset));
1791
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1793
if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1794
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1795
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
1799
if (iref.ir_block->cb_handle_count) {
1800
if (!xt_ind_copy_on_write(&iref))
1804
#ifdef IND_OPT_DATA_WRITTEN
1805
if (result.sr_item.i_item_offset < new_min_pos)
1806
new_min_pos = result.sr_item.i_item_offset;
1808
memcpy(iref.ir_branch, &ot->ot_ind_wbuf, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_item_offset);
1809
#ifdef IND_OPT_DATA_WRITTEN
1810
iref.ir_block->cb_header = TRUE;
1811
if (new_min_pos < iref.ir_block->cb_min_pos)
1812
iref.ir_block->cb_min_pos = new_min_pos;
1813
iref.ir_block->cb_max_pos = result.sr_item.i_item_offset;
1814
ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
1815
ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
1817
iref.ir_updated = TRUE;
1818
xt_ind_release(ot, ind, XT_UNLOCK_R_UPDATE, &iref);
1820
/* Insert the new branch into the parent node, using the new middle key value: */
1821
if (!idx_insert_node(ot, ind, stack, last_item, key_value, new_branch)) {
1822
// Index may be inconsistant now...
1823
idx_free_branch(ot, ind, new_branch);
1831
idx_free_branch(ot, ind, new_branch);
1834
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
1840
#define IDX_MAX_INDEX_FLUSH_COUNT 10
1842
struct IdxTableItem {
1843
xtTableID ti_tab_id;
1844
u_int ti_dirty_blocks;
1847
inline u_int idx_dirty_blocks(XTTableHPtr tab)
1854
indp = tab->tab_dic.dic_keys;
1855
for (int i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
1857
dirty_blocks += ind->mi_dirty_blocks;
1859
return dirty_blocks;
1862
static xtBool idx_out_of_memory_failure(XTOpenTablePtr ot)
1864
#ifdef XT_TRACK_INDEX_UPDATES
1865
/* If the index has been changed when we run out of memory, we
1866
* will corrupt the index!
1868
ASSERT_NS(ot->ot_ind_changed == 0);
1870
if (ot->ot_thread->t_exception.e_xt_err == XT_ERR_NO_INDEX_CACHE) {
1871
u_int block_total = xt_ind_get_blocks();
1873
/* Flush index and retry! */
1874
xt_clear_exception(ot->ot_thread);
1876
if (idx_dirty_blocks(ot->ot_table) >= block_total / 4) {
1877
if (!xt_async_flush_indices(ot->ot_table, FALSE, TRUE, ot->ot_thread))
1879
if (!xt_wait_for_async_task_results(ot->ot_thread))
1883
XTDatabaseHPtr db = ot->ot_table->tab_db;
1884
IdxTableItem table_list[IDX_MAX_INDEX_FLUSH_COUNT];
1888
XTTableEntryPtr tab_ptr;
1890
u_int dirty_total = 0;
1892
xt_ht_lock(NULL, db->db_tables);
1893
xt_enum_tables_init(&edx);
1894
while ((tab_ptr = xt_enum_tables_next(NULL, db, &edx))) {
1895
if (tab_ptr->te_table) {
1896
if (tab_ptr->te_table->tab_ind_flush_task->tk_is_running()) {
1897
if (!(dirty_blocks = tab_ptr->te_table->tab_ind_flush_task->fit_dirty_blocks))
1898
dirty_blocks = idx_dirty_blocks(tab_ptr->te_table);
1901
dirty_blocks = idx_dirty_blocks(tab_ptr->te_table);
1902
dirty_total += dirty_blocks;
1904
for (i=0; i<item_count; i++) {
1905
if (table_list[i].ti_dirty_blocks < dirty_blocks)
1908
if (i < IDX_MAX_INDEX_FLUSH_COUNT) {
1911
if (item_count < IDX_MAX_INDEX_FLUSH_COUNT) {
1912
cnt = item_count - i;
1916
cnt = item_count - i - 1;
1917
memmove(&table_list[i], &table_list[i+1], sizeof(IdxTableItem) * cnt);
1918
table_list[i].ti_tab_id = tab_ptr->te_table->tab_id;
1919
table_list[i].ti_dirty_blocks = dirty_blocks;
1922
if (dirty_total >= block_total / 4)
1926
xt_ht_unlock(NULL, db->db_tables);
1927
if (dirty_total >= block_total / 4) {
1928
for (i=0; i<item_count; i++) {
1929
if (table_list[i].ti_tab_id == ot->ot_table->tab_id) {
1930
if (!xt_async_flush_indices(ot->ot_table, FALSE, TRUE, ot->ot_thread))
1937
if ((tab = xt_use_table_by_id_ns(db, table_list[i].ti_tab_id))) {
1938
ok = xt_async_flush_indices(tab, FALSE, TRUE, ot->ot_thread);
1939
xt_heap_release_ns(tab);
1943
if (!xt_wait_for_async_task_results(ot->ot_thread))
1954
* Check all the duplicate variation in an index.
1955
* If one of them is visible, then we have a duplicate key
1958
* GOTCHA: This routine must use the write index buffer!
1960
static xtBool idx_check_duplicates(XTOpenTablePtr ot, XTIndexPtr ind, XTIdxKeyValuePtr key_value)
1962
IdxBranchStackRec stack;
1963
xtIndexNodeID current;
1964
XTIndReferenceRec iref;
1965
XTIdxResultRec result;
1966
xtBool on_key = FALSE;
1973
iref.ir_updated = 2;
1976
idx_newstack(&stack);
1978
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
1981
save_flags = key_value->sv_flags;
1982
key_value->sv_flags = 0;
1984
while (XT_NODE_ID(current)) {
1985
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref)) {
1986
key_value->sv_flags = save_flags;
1989
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, key_value, &result);
1990
if (result.sr_found)
1991
/* If we have found the key in a node: */
1993
if (!result.sr_item.i_node_ref_size)
1995
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
1996
if (!idx_push(&stack, current, &result.sr_item)) {
1997
key_value->sv_flags = save_flags;
2000
current = result.sr_branch;
2003
key_value->sv_flags = save_flags;
2006
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2011
if (result.sr_item.i_item_offset == result.sr_item.i_total_size) {
2012
IdxStackItemPtr node;
2014
/* We are at the end of a leaf node.
2015
* Go up the stack to find the start position of the next key.
2016
* If we find none, then we are the end of the index.
2018
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2019
while ((node = idx_pop(&stack))) {
2020
if (node->i_pos.i_item_offset < node->i_pos.i_total_size) {
2021
current = node->i_branch;
2022
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
2024
xt_get_res_record_ref(&iref.ir_branch->tb_data[node->i_pos.i_item_offset + node->i_pos.i_item_size - XT_RECORD_REF_SIZE], &result);
2025
result.sr_item = node->i_pos;
2033
/* Quit the loop if the key is no longer matched! */
2034
if (myxt_compare_key(ind, 0, key_value->sv_length, key_value->sv_key, &iref.ir_branch->tb_data[result.sr_item.i_item_offset]) != 0) {
2035
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2039
if (ind->mi_lazy_delete) {
2040
if (result.sr_row_id == (xtRowID) -1)
2044
switch (xt_tab_maybe_committed(ot, result.sr_rec_id, &xn_id, NULL, NULL)) {
2046
/* Record is not committed, wait for the transaction. */
2047
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2048
XT_INDEX_UNLOCK(ind, ot);
2049
xw.xw_xn_id = xn_id;
2050
if (!xt_xn_wait_for_xact(ot->ot_thread, &xw, NULL)) {
2051
XT_INDEX_WRITE_LOCK(ind, ot);
2054
XT_INDEX_WRITE_LOCK(ind, ot);
2057
/* Error while reading... */
2060
/* Record is committed or belongs to me, duplicate key: */
2061
XT_DEBUG_TRACE(("DUPLICATE KEY tx=%d rec=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_xn_id, (int) result.sr_rec_id));
2062
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_DUPLICATE_KEY);
2065
/* Record is deleted or rolled-back: */
2070
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
2072
if (result.sr_item.i_node_ref_size) {
2073
/* Go down to the bottom: */
2074
while (XT_NODE_ID(current)) {
2075
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2076
if (!idx_push(&stack, current, &result.sr_item))
2078
current = result.sr_branch;
2079
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
2081
idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
2082
if (!result.sr_item.i_node_ref_size)
2091
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2095
inline static void idx_still_on_key(XTIndexPtr ind, register XTIdxSearchKeyPtr search_key, register XTIdxBranchDPtr branch, register XTIdxItemPtr item)
2097
if (search_key && search_key->sk_on_key) {
2098
search_key->sk_on_key = myxt_compare_key(ind, search_key->sk_key_value.sv_flags, search_key->sk_key_value.sv_length,
2099
search_key->sk_key_value.sv_key, &branch->tb_data[item->i_item_offset]) == 0;
2104
* Insert a value into the given index. Return FALSE if an error occurs.
2106
xtPublic xtBool xt_idx_insert(XTOpenTablePtr ot, XTIndexPtr ind, xtRowID row_id, xtRecordID rec_id, xtWord1 *rec_buf, xtWord1 *bef_buf, xtBool allow_dups)
2108
XTIdxKeyValueRec key_value;
2109
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE];
2110
IdxBranchStackRec stack;
2111
xtIndexNodeID current;
2112
XTIndReferenceRec iref;
2113
xtIndexNodeID new_branch;
2114
XTIdxBranchDPtr new_branch_ptr;
2116
XTIdxResultRec result;
2118
xtBool check_for_dups = ind->mi_flags & (HA_UNIQUE_CHECK | HA_NOSAME) && !allow_dups;
2119
xtBool lock_structure = FALSE;
2120
xtBool updated = FALSE;
2121
#ifdef IND_OPT_DATA_WRITTEN
2127
iref.ir_updated = 2;
2129
#ifdef CHECK_AND_PRINT
2130
//idx_check_index(ot, ind, TRUE);
2134
#ifdef XT_TRACK_INDEX_UPDATES
2135
ot->ot_ind_changed = 0;
2137
key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
2138
key_value.sv_rec_id = rec_id;
2139
key_value.sv_row_id = row_id; /* Should always be zero on insert (will be update by sweeper later).
2140
* Non-zero only during recovery, assuming that sweeper will process such records right after recovery.
2142
key_value.sv_key = key_buf;
2143
key_value.sv_length = myxt_create_key_from_row(ind, key_buf, rec_buf, &check_for_dups);
2145
if (bef_buf && check_for_dups) {
2146
/* If we have a before image, and we are required to check for duplicates.
2147
* then compare the before image key with the after image key.
2149
xtWord1 bef_key_buf[XT_INDEX_MAX_KEY_SIZE];
2151
xtBool has_no_null = TRUE;
2153
len = myxt_create_key_from_row(ind, bef_key_buf, bef_buf, &has_no_null);
2155
/* If the before key has no null values, then compare with the after key value.
2156
* We only have to check for duplicates if the key has changed!
2158
check_for_dups = myxt_compare_key(ind, 0, len, bef_key_buf, key_buf) != 0;
2162
/* The index appears to have no root: */
2163
if (!XT_NODE_ID(ind->mi_root))
2164
lock_structure = TRUE;
2167
idx_newstack(&stack);
2169
/* A write lock is only required if we are going to change the
2170
* strcuture of the index!
2173
XT_INDEX_WRITE_LOCK(ind, ot);
2175
XT_INDEX_READ_LOCK(ind, ot);
2178
/* Create a root node if required: */
2179
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root))) {
2180
/* Index is empty, create a new one: */
2181
ASSERT_NS(lock_structure);
2182
if (!xt_ind_reserve(ot, 1, NULL))
2184
if (!idx_new_branch(ot, ind, &new_branch))
2186
size = idx_write_branch_item(ind, ot->ot_ind_wbuf.tb_data, &key_value);
2187
XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_LEAF_SIZE(size));
2188
IDX_TRACE("%d-> %x\n", (int) new_branch, (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
2189
if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + size, (xtWord1 *) &ot->ot_ind_wbuf))
2191
ind->mi_root = new_branch;
2195
/* Search down the tree for the insertion point. */
2196
#ifdef IND_SKEW_SPLIT_ON_APPEND
2197
result.sr_last_item = TRUE;
2199
while (XT_NODE_ID(current)) {
2200
if (!xt_ind_fetch(ot, ind, current, XT_XLOCK_LEAF, &iref))
2202
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &key_value, &result);
2203
if (result.sr_duplicate) {
2204
if (check_for_dups) {
2205
/* Duplicates are not allowed, at least one has been
2209
/* Leaf nodes (i_node_ref_size == 0) are write locked,
2210
* non-leaf nodes are read locked.
2212
xt_ind_release(ot, ind, result.sr_item.i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, &iref);
2214
if (!idx_check_duplicates(ot, ind, &key_value))
2216
/* We have checked all the "duplicate" variations. None of them are
2217
* relevant. So this will cause a correct insert.
2219
check_for_dups = FALSE;
2220
idx_newstack(&stack);
2224
if (result.sr_found) {
2225
/* Node found, can happen during recovery of indexes!
2226
* We have found an exact match of both key and record.
2228
XTPageUnlockType utype;
2229
xtBool overwrite = FALSE;
2231
/* {LAZY-DEL-INDEX-ITEMS}
2232
* If the item has been lazy deleted, then just overwrite!
2234
if (result.sr_row_id == (xtRowID) -1) {
2237
/* This is safe because we have an xlock on the leaf. */
2238
if ((del_count = iref.ir_block->cp_del_count))
2239
iref.ir_block->cp_del_count = del_count-1;
2243
if (!result.sr_row_id && row_id) {
2244
/* {INDEX-RECOV_ROWID} Set the row-id
2245
* during recovery, even if the index entry
2247
* It will be removed later by the sweeper.
2253
if (!idx_set_item_row_id(ot, ind, &iref, &result.sr_item, row_id))
2255
utype = result.sr_item.i_node_ref_size ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_W_UPDATE;
2258
utype = result.sr_item.i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE;
2259
xt_ind_release(ot, ind, utype, &iref);
2262
/* Stop when we get to a leaf: */
2263
if (!result.sr_item.i_node_ref_size)
2265
xt_ind_release(ot, ind, result.sr_item.i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, &iref);
2266
if (!idx_push(&stack, current, NULL))
2268
current = result.sr_branch;
2270
ASSERT_NS(XT_NODE_ID(current));
2272
/* Must be a leaf!: */
2273
ASSERT_NS(!result.sr_item.i_node_ref_size);
2276
if (ind->mi_lazy_delete && iref.ir_block->cp_del_count) {
2277
/* There are a number of possibilities:
2278
* - We could just replace a lazy deleted slot.
2279
* - We could compact and insert.
2280
* - We could just insert
2283
if (result.sr_item.i_item_offset > 0) {
2284
/* Check if it can go into the previous node: */
2285
XTIdxResultRec t_res;
2287
t_res.sr_item = result.sr_item;
2288
xt_prev_branch_item_fix(ot->ot_table, ind, iref.ir_branch, &t_res);
2289
if (t_res.sr_row_id != (xtRowID) -1)
2292
/* Yup, it can, but first check to see if it would be
2293
* better to put it in the current node.
2294
* This is the case if the previous node key is not the
2295
* same as the key we are adding...
2297
if (result.sr_item.i_item_offset < result.sr_item.i_total_size &&
2298
result.sr_row_id == (xtRowID) -1) {
2299
if (!idx_cmp_item_key_fix(&iref, &t_res.sr_item, &key_value))
2303
idx_set_item_key_fix(&iref, &t_res.sr_item, &key_value);
2304
iref.ir_block->cp_del_count--;
2305
xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
2310
if (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
2311
if (result.sr_row_id == (xtRowID) -1) {
2312
idx_set_item_key_fix(&iref, &result.sr_item, &key_value);
2313
iref.ir_block->cp_del_count--;
2314
xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
2319
/* Check if we must compact...
2320
* It makes no sense to split as long as there are lazy deleted items
2321
* in the page. So, delete them if a split would otherwise be required!
2323
ASSERT_NS(key_value.sv_length + XT_RECORD_REF_SIZE == result.sr_item.i_item_size);
2324
if (result.sr_item.i_total_size + key_value.sv_length + XT_RECORD_REF_SIZE > XT_INDEX_PAGE_DATA_SIZE) {
2325
if (!idx_compact_leaf(ot, ind, &iref, &result.sr_item))
2330
/* Fall through to the insert code... */
2331
/* NOTE: if there were no lazy deleted items in the leaf, then
2332
* idx_compact_leaf is a NOP. This is the only case in which it may not
2333
* fall through and do the insert below.
2335
* Normally, if the cp_del_count is correct then the insert
2336
* will work below, and the assertion here will not fail.
2338
* In this case, the xt_ind_release() will correctly indicate an update.
2340
ASSERT_NS(result.sr_item.i_total_size + key_value.sv_length + XT_RECORD_REF_SIZE <= XT_INDEX_PAGE_DATA_SIZE);
2343
if (result.sr_item.i_total_size + key_value.sv_length + XT_RECORD_REF_SIZE <= XT_INDEX_PAGE_DATA_SIZE) {
2344
if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
2345
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
2346
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
2350
if (iref.ir_block->cb_handle_count) {
2351
if (!xt_ind_copy_on_write(&iref))
2355
idx_insert_leaf_item(ind, iref.ir_branch, &key_value, &result);
2356
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
2357
ASSERT_NS(result.sr_item.i_total_size <= XT_INDEX_PAGE_DATA_SIZE);
2358
#ifdef IND_OPT_DATA_WRITTEN
2359
iref.ir_block->cb_header = TRUE;
2360
if (result.sr_item.i_item_offset < iref.ir_block->cb_min_pos)
2361
iref.ir_block->cb_min_pos = result.sr_item.i_item_offset;
2362
iref.ir_block->cb_max_pos = result.sr_item.i_total_size;
2363
ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
2364
ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
2366
iref.ir_updated = TRUE;
2367
xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
2371
/* Key does not fit. Must split the node.
2372
* Make sure we have a structural lock:
2374
if (!lock_structure) {
2375
xt_ind_release(ot, ind, updated ? XT_UNLOCK_W_UPDATE : XT_UNLOCK_WRITE, &iref);
2376
XT_INDEX_UNLOCK(ind, ot);
2377
lock_structure = TRUE;
2378
goto lock_and_retry;
2381
memcpy(&ot->ot_ind_wbuf, iref.ir_branch, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_total_size);
2382
idx_insert_leaf_item(ind, &ot->ot_ind_wbuf, &key_value, &result);
2383
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
2384
ASSERT_NS(result.sr_item.i_total_size > XT_INDEX_PAGE_DATA_SIZE && result.sr_item.i_total_size <= XT_INDEX_PAGE_DATA_SIZE*2);
2385
#ifdef IND_OPT_DATA_WRITTEN
2386
new_min_pos = result.sr_item.i_item_offset;
2389
/* This is the number of potential writes. In other words, the total number
2390
* of blocks that may be accessed.
2392
* Note that this assume if a block is read and written soon after that the block
2393
* will not be freed in-between (a safe assumption?)
2395
if (!xt_ind_reserve(ot, stack.s_top * 2 + 3, iref.ir_branch))
2398
/* Key does not fit, must split... */
2399
if (!idx_get_middle_branch_item(ot, ind, &ot->ot_ind_wbuf, &key_value, &result))
2402
if (!idx_new_branch(ot, ind, &new_branch))
2405
/* Copy and write the rest of the data to the new node: */
2406
new_size = result.sr_item.i_total_size - result.sr_item.i_item_offset - result.sr_item.i_item_size;
2407
new_branch_ptr = (XTIdxBranchDPtr) &ot->ot_ind_wbuf.tb_data[XT_INDEX_PAGE_DATA_SIZE];
2408
memmove(new_branch_ptr->tb_data, &ot->ot_ind_wbuf.tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size], new_size);
2410
XT_SET_DISK_2(new_branch_ptr->tb_size_2, XT_MAKE_LEAF_SIZE(new_size));
2411
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(new_branch), (int) XT_GET_DISK_2(new_branch_ptr->tb_size_2));
2412
if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + new_size, (xtWord1 *) new_branch_ptr))
2415
/* Modify the first node: */
2416
XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_LEAF_SIZE(result.sr_item.i_item_offset));
2417
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
2419
if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
2420
ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
2421
if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
2425
if (iref.ir_block->cb_handle_count) {
2426
if (!xt_ind_copy_on_write(&iref))
2429
#ifdef IND_OPT_DATA_WRITTEN
2430
if (result.sr_item.i_item_offset < new_min_pos)
2431
new_min_pos = result.sr_item.i_item_offset;
2433
memcpy(iref.ir_branch, &ot->ot_ind_wbuf, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_item_offset);
2434
#ifdef IND_OPT_DATA_WRITTEN
2435
iref.ir_block->cb_header = TRUE;
2436
if (new_min_pos < iref.ir_block->cb_min_pos)
2437
iref.ir_block->cb_min_pos = new_min_pos;
2438
iref.ir_block->cb_max_pos = result.sr_item.i_item_offset;
2439
ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
2440
ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
2442
iref.ir_updated = TRUE;
2443
xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
2445
/* Insert the new branch into the parent node, using the new middle key value: */
2446
if (!idx_insert_node(ot, ind, &stack, result.sr_last_item, &key_value, new_branch)) {
2447
// Index may be inconsistant now...
2448
idx_free_branch(ot, ind, new_branch);
2452
#ifdef XT_TRACK_INDEX_UPDATES
2453
ASSERT_NS(ot->ot_ind_reserved >= ot->ot_ind_reads);
2457
XT_INDEX_UNLOCK(ind, ot);
2460
//printf("INSERT OK\n");
2461
//idx_check_index(ot, ind, TRUE);
2463
xt_ind_unreserve(ot);
2467
idx_free_branch(ot, ind, new_branch);
2470
xt_ind_release(ot, ind, updated ? XT_UNLOCK_W_UPDATE : XT_UNLOCK_WRITE, &iref);
2473
XT_INDEX_UNLOCK(ind, ot);
2474
if (idx_out_of_memory_failure(ot))
2475
goto retry_after_oom;
2478
//printf("INSERT FAILED\n");
2479
//idx_check_index(ot, ind, TRUE);
2481
xt_ind_unreserve(ot);
2486
/* Remove the given item in the node.
2487
* This is done by going down the tree to find a replacement
2488
* for the deleted item!
2490
static xtBool idx_remove_item_in_node(XTOpenTablePtr ot, XTIndexPtr ind, IdxBranchStackPtr stack, XTIndReferencePtr iref, XTIdxKeyValuePtr key_value)
2492
IdxStackItemPtr delete_node;
2493
XTIdxResultRec result;
2494
xtIndexNodeID current;
2495
xtBool lazy_delete_cleanup_required = FALSE;
2496
IdxStackItemPtr current_top;
2498
delete_node = idx_top(stack);
2499
current = delete_node->i_branch;
2500
result.sr_item = delete_node->i_pos;
2502
/* Follow the branch after this item: */
2503
idx_next_branch_item(ot->ot_table, ind, iref->ir_branch, &result);
2504
xt_ind_release(ot, ind, iref->ir_updated ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_READ, iref);
2506
/* Go down the left-hand side until we reach a leaf: */
2507
while (XT_NODE_ID(current)) {
2508
current = result.sr_branch;
2509
if (!xt_ind_fetch(ot, ind, current, XT_XLOCK_LEAF, iref))
2511
idx_first_branch_item(ot->ot_table, ind, iref->ir_branch, &result);
2512
if (!result.sr_item.i_node_ref_size)
2514
xt_ind_release(ot, ind, XT_UNLOCK_READ, iref);
2515
if (!idx_push(stack, current, &result.sr_item))
2519
ASSERT_NS(XT_NODE_ID(current));
2520
ASSERT_NS(!result.sr_item.i_node_ref_size);
2522
if (!xt_ind_reserve(ot, stack->s_top + 2, iref->ir_branch)) {
2523
xt_ind_release(ot, ind, XT_UNLOCK_WRITE, iref);
2527
/* This code removes lazy deleted items from the leaf,
2528
* before we promote an item to a leaf.
2529
* This is not essential, but prevents lazy deleted
2530
* items from being propogated up the tree.
2532
if (ind->mi_lazy_delete) {
2533
if (iref->ir_block->cp_del_count) {
2534
if (!idx_compact_leaf(ot, ind, iref, &result.sr_item))
2539
/* Crawl back up the stack trace, looking for a key
2540
* that can be used to replace the deleted key.
2542
* Any empty nodes on the way up can be removed!
2544
if (result.sr_item.i_total_size > 0) {
2545
/* There is a key in the leaf, extract it, and put it in the node: */
2546
memcpy(key_value->sv_key, &iref->ir_branch->tb_data[result.sr_item.i_item_offset], result.sr_item.i_item_size);
2547
/* This call also frees the iref.ir_branch page! */
2548
if (!idx_remove_branch_item_right(ot, ind, current, iref, &result.sr_item))
2550
if (!idx_replace_node_key(ot, ind, delete_node, stack, result.sr_item.i_item_size, key_value->sv_key))
2555
xt_ind_release(ot, ind, iref->ir_updated ? XT_UNLOCK_W_UPDATE : XT_UNLOCK_WRITE, iref);
2558
/* The current node/leaf is empty, remove it: */
2559
idx_free_branch(ot, ind, current);
2561
current_top = idx_pop(stack);
2562
current = current_top->i_branch;
2563
if (!xt_ind_fetch(ot, ind, current, XT_XLOCK_LEAF, iref))
2566
if (current_top == delete_node) {
2567
/* All children have been removed. Delete the key and done: */
2568
if (!idx_remove_branch_item_right(ot, ind, current, iref, ¤t_top->i_pos))
2573
if (current_top->i_pos.i_total_size > current_top->i_pos.i_node_ref_size) {
2575
memcpy(key_value->sv_key, &iref->ir_branch->tb_data[current_top->i_pos.i_item_offset], current_top->i_pos.i_item_size);
2576
/* This function also frees the cache page: */
2577
if (!idx_remove_branch_item_left(ot, ind, current, iref, ¤t_top->i_pos, &lazy_delete_cleanup_required))
2579
if (!idx_replace_node_key(ot, ind, delete_node, stack, current_top->i_pos.i_item_size, key_value->sv_key))
2582
if (lazy_delete_cleanup_required) {
2583
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, iref))
2585
if (!idx_remove_lazy_deleted_item_in_node(ot, ind, current, iref, key_value))
2590
xt_ind_release(ot, ind, current_top->i_pos.i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, iref);
2594
#ifdef XT_TRACK_INDEX_UPDATES
2595
ASSERT_NS(ot->ot_ind_reserved >= ot->ot_ind_reads);
2601
* This function assumes we have a lock on the structure of the index.
2603
static xtBool idx_remove_lazy_deleted_item_in_node(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID current, XTIndReferencePtr iref, XTIdxKeyValuePtr key_value)
2605
IdxBranchStackRec stack;
2606
XTIdxResultRec result;
2608
/* Now remove all lazy deleted items in this node.... */
2609
idx_first_branch_item(ot->ot_table, ind, (XTIdxBranchDPtr) iref->ir_block->cb_data, &result);
2612
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
2613
if (result.sr_row_id == (xtRowID) -1)
2615
idx_next_branch_item(ot->ot_table, ind, (XTIdxBranchDPtr) iref->ir_block->cb_data, &result);
2621
idx_newstack(&stack);
2622
if (!idx_push(&stack, current, &result.sr_item)) {
2623
xt_ind_release(ot, ind, iref->ir_updated ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_READ, iref);
2627
if (!idx_remove_item_in_node(ot, ind, &stack, iref, key_value))
2630
/* Go back up to the node we are trying to
2633
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, iref))
2635
/* Load the data again: */
2636
idx_reload_item_fix(ind, iref->ir_branch, &result);
2639
xt_ind_release(ot, ind, iref->ir_updated ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_READ, iref);
2643
static xtBool idx_delete(XTOpenTablePtr ot, XTIndexPtr ind, XTIdxKeyValuePtr key_value)
2645
IdxBranchStackRec stack;
2646
xtIndexNodeID current;
2647
XTIndReferenceRec iref;
2648
XTIdxResultRec result;
2649
xtBool lock_structure = FALSE;
2653
iref.ir_updated = 2;
2655
/* The index appears to have no root: */
2656
if (!XT_NODE_ID(ind->mi_root))
2657
lock_structure = TRUE;
2660
idx_newstack(&stack);
2663
XT_INDEX_WRITE_LOCK(ind, ot);
2665
XT_INDEX_READ_LOCK(ind, ot);
2667
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
2670
while (XT_NODE_ID(current)) {
2671
if (!xt_ind_fetch(ot, ind, current, XT_XLOCK_DEL_LEAF, &iref))
2673
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, key_value, &result);
2674
if (!result.sr_item.i_node_ref_size) {
2676
if (result.sr_found) {
2677
if (ind->mi_lazy_delete) {
2678
/* If the we have a W lock, then fetch decided that we
2679
* need to compact the page.
2680
* The decision is made by xt_idx_lazy_delete_on_leaf()
2682
if (!iref.ir_xlock) {
2683
if (!idx_lazy_delete_branch_item(ot, ind, &iref, &result.sr_item))
2687
if (!iref.ir_block->cp_del_count) {
2688
if (!idx_remove_branch_item_right(ot, ind, current, &iref, &result.sr_item))
2692
if (!idx_lazy_remove_leaf_item_right(ot, ind, &iref, &result.sr_item))
2698
if (!idx_remove_branch_item_right(ot, ind, current, &iref, &result.sr_item))
2703
xt_ind_release(ot, ind, iref.ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, &iref);
2706
if (!idx_push(&stack, current, &result.sr_item)) {
2707
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2710
if (result.sr_found)
2711
/* If we have found the key in a node: */
2713
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2714
current = result.sr_branch;
2717
/* Must be a non-leaf!: */
2718
ASSERT_NS(result.sr_item.i_node_ref_size);
2720
if (ind->mi_lazy_delete) {
2721
if (!idx_lazy_delete_on_node(ind, iref.ir_block, &result.sr_item)) {
2722
/* We need to remove some items from this node: */
2724
if (!lock_structure) {
2725
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2726
XT_INDEX_UNLOCK(ind, ot);
2727
lock_structure = TRUE;
2728
goto lock_and_retry;
2731
if (!idx_set_item_deleted(ot, ind, &iref, &result.sr_item))
2733
if (!idx_remove_lazy_deleted_item_in_node(ot, ind, current, &iref, key_value))
2738
if (!ot->ot_table->tab_dic.dic_no_lazy_delete) {
2739
/* {LAZY-DEL-INDEX-ITEMS}
2740
* We just set item to deleted, this is a significant time
2742
* But this item can only be cleaned up when all
2743
* items on the node below are deleted.
2745
if (!idx_lazy_delete_branch_item(ot, ind, &iref, &result.sr_item))
2751
/* We will have to remove the key from a non-leaf node,
2752
* which means we are changing the structure of the index.
2753
* Make sure we have a structural lock:
2755
if (!lock_structure) {
2756
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2757
XT_INDEX_UNLOCK(ind, ot);
2758
lock_structure = TRUE;
2759
goto lock_and_retry;
2762
/* This is the item we will have to replace: */
2763
if (!idx_remove_item_in_node(ot, ind, &stack, &iref, key_value))
2767
XT_INDEX_UNLOCK(ind, ot);
2770
//printf("DELETE OK\n");
2771
//idx_check_index(ot, ind, TRUE);
2773
xt_ind_unreserve(ot);
2777
XT_INDEX_UNLOCK(ind, ot);
2778
xt_ind_unreserve(ot);
2782
xtPublic xtBool xt_idx_delete(XTOpenTablePtr ot, XTIndexPtr ind, xtRecordID rec_id, xtWord1 *rec_buf)
2784
XTIdxKeyValueRec key_value;
2785
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE + XT_MAX_RECORD_REF_SIZE];
2788
#ifdef XT_TRACK_INDEX_UPDATES
2789
ot->ot_ind_changed = 0;
2792
key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
2793
key_value.sv_rec_id = rec_id;
2794
key_value.sv_row_id = 0;
2795
key_value.sv_key = key_buf;
2796
key_value.sv_length = myxt_create_key_from_row(ind, key_buf, rec_buf, NULL);
2798
if (!idx_delete(ot, ind, &key_value)) {
2799
if (idx_out_of_memory_failure(ot))
2800
goto retry_after_oom;
2806
xtPublic xtBool xt_idx_update_row_id(XTOpenTablePtr ot, XTIndexPtr ind, xtRecordID rec_id, xtRowID row_id, xtWord1 *rec_buf)
2808
xtIndexNodeID current;
2809
XTIndReferenceRec iref;
2810
XTIdxResultRec result;
2811
XTIdxKeyValueRec key_value;
2812
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE + XT_MAX_RECORD_REF_SIZE];
2816
iref.ir_updated = 2;
2818
#ifdef CHECK_AND_PRINT
2819
idx_check_index(ot, ind, TRUE);
2822
#ifdef XT_TRACK_INDEX_UPDATES
2823
ot->ot_ind_changed = 0;
2825
key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
2826
key_value.sv_rec_id = rec_id;
2827
key_value.sv_row_id = 0;
2828
key_value.sv_key = key_buf;
2829
key_value.sv_length = myxt_create_key_from_row(ind, key_buf, rec_buf, NULL);
2831
/* NOTE: Only a read lock is required for this!!
2833
* 09.05.2008 - This has changed because the dirty list now
2834
* hangs on the index. And the dirty list may be updated
2835
* by any change of the index.
2836
* However, the advantage is that I should be able to read
2837
* lock in the first phase of the flush.
2839
* 18.02.2009 - This has changed again.
2840
* I am now using a read lock, because this update does not
2841
* require a structural change. In fact, it does not even
2842
* need a WRITE LOCK on the page affected, because there
2843
* is only ONE thread that can do this (the sweeper).
2845
* This has the advantage that the sweeper (which uses this
2846
* function, causes less conflicts.
2848
* However, it does mean that the dirty list must be otherwise
2849
* protected (which it now is be a spin lock - mi_dirty_lock).
2851
* It also has the dissadvantage that I am going to have to
2852
* take an xlock in the first phase of the flush.
2854
XT_INDEX_READ_LOCK(ind, ot);
2856
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
2859
while (XT_NODE_ID(current)) {
2860
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
2862
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &key_value, &result);
2863
if (result.sr_found || !result.sr_item.i_node_ref_size)
2865
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2866
current = result.sr_branch;
2869
if (result.sr_found) {
2870
/* TODO: Check that concurrent reads can handle this!
2871
* assuming the write is not atomic.
2873
if (!idx_set_item_row_id(ot, ind, &iref, &result.sr_item, row_id))
2875
xt_ind_release(ot, ind, XT_UNLOCK_R_UPDATE, &iref);
2878
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2881
XT_INDEX_UNLOCK(ind, ot);
2884
//idx_check_index(ot, ind, TRUE);
2885
//idx_check_on_key(ot);
2890
XT_INDEX_UNLOCK(ind, ot);
2891
if (idx_out_of_memory_failure(ot))
2892
goto retry_after_oom;
2896
xtPublic void xt_idx_prep_key(XTIndexPtr ind, register XTIdxSearchKeyPtr search_key, int flags, xtWord1 *in_key_buf, size_t in_key_length)
2898
search_key->sk_key_value.sv_flags = flags;
2899
search_key->sk_key_value.sv_rec_id = 0;
2900
search_key->sk_key_value.sv_row_id = 0;
2901
search_key->sk_key_value.sv_key = search_key->sk_key_buf;
2902
search_key->sk_key_value.sv_length = myxt_create_key_from_key(ind, search_key->sk_key_buf, in_key_buf, in_key_length);
2903
search_key->sk_on_key = FALSE;
2906
xtPublic xtBool xt_idx_research(XTOpenTablePtr ot, XTIndexPtr ind)
2908
XTIdxSearchKeyRec search_key;
2910
xt_ind_lock_handle(ot->ot_ind_rhandle);
2911
search_key.sk_key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
2912
xt_get_record_ref(&ot->ot_ind_rhandle->ih_branch->tb_data[ot->ot_ind_state.i_item_offset + ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE],
2913
&search_key.sk_key_value.sv_rec_id, &search_key.sk_key_value.sv_row_id);
2914
search_key.sk_key_value.sv_key = search_key.sk_key_buf;
2915
search_key.sk_key_value.sv_length = ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE;
2916
search_key.sk_on_key = FALSE;
2917
memcpy(search_key.sk_key_buf, &ot->ot_ind_rhandle->ih_branch->tb_data[ot->ot_ind_state.i_item_offset], search_key.sk_key_value.sv_length);
2918
xt_ind_unlock_handle(ot->ot_ind_rhandle);
2919
return xt_idx_search(ot, ind, &search_key);
2923
* Search for a given key and position the current pointer on the first
2924
* key in the list of duplicates. If the key is not found the current
2925
* pointer is placed at the first position after the key.
2927
xtPublic xtBool xt_idx_search(XTOpenTablePtr ot, XTIndexPtr ind, register XTIdxSearchKeyPtr search_key)
2929
IdxBranchStackRec stack;
2930
xtIndexNodeID current;
2931
XTIndReferenceRec iref;
2932
XTIdxResultRec result;
2936
iref.ir_updated = 2;
2938
if (ot->ot_ind_rhandle) {
2939
xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, ot->ot_thread);
2940
ot->ot_ind_rhandle = NULL;
2943
//idx_check_index(ot, ind, TRUE);
2946
/* Calling from recovery, this is not the case.
2947
* But the index read does not require a transaction!
2948
* Only insert requires this to check for duplicates.
2949
if (!ot->ot_thread->st_xact_data) {
2950
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
2956
#ifdef XT_TRACK_INDEX_UPDATES
2957
ot->ot_ind_changed = 0;
2959
idx_newstack(&stack);
2961
ot->ot_curr_rec_id = 0;
2962
ot->ot_curr_row_id = 0;
2964
XT_INDEX_READ_LOCK(ind, ot);
2966
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
2969
while (XT_NODE_ID(current)) {
2970
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
2972
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &search_key->sk_key_value, &result);
2973
if (result.sr_found)
2974
/* If we have found the key in a node: */
2975
search_key->sk_on_key = TRUE;
2976
if (!result.sr_item.i_node_ref_size)
2978
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2979
if (!idx_push(&stack, current, &result.sr_item))
2981
current = result.sr_branch;
2984
if (ind->mi_lazy_delete) {
2985
ignore_lazy_deleted_items:
2986
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
2987
if (result.sr_row_id != (xtRowID) -1) {
2988
idx_still_on_key(ind, search_key, iref.ir_branch, &result.sr_item);
2991
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
2995
if (result.sr_item.i_item_offset == result.sr_item.i_total_size) {
2996
IdxStackItemPtr node;
2998
/* We are at the end of a leaf node.
2999
* Go up the stack to find the start position of the next key.
3000
* If we find none, then we are the end of the index.
3002
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3003
while ((node = idx_pop(&stack))) {
3004
if (node->i_pos.i_item_offset < node->i_pos.i_total_size) {
3005
if (!xt_ind_fetch(ot, ind, node->i_branch, XT_LOCK_READ, &iref))
3007
xt_get_res_record_ref(&iref.ir_branch->tb_data[node->i_pos.i_item_offset + node->i_pos.i_item_size - XT_RECORD_REF_SIZE], &result);
3009
if (ind->mi_lazy_delete) {
3010
result.sr_item = node->i_pos;
3011
if (result.sr_row_id == (xtRowID) -1) {
3012
/* If this node pointer is lazy deleted, then
3013
* go down the next branch...
3015
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3017
/* Go down to the bottom: */
3018
current = node->i_branch;
3019
while (XT_NODE_ID(current)) {
3020
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3021
if (!idx_push(&stack, current, &result.sr_item))
3023
current = result.sr_branch;
3024
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3026
idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3027
if (!result.sr_item.i_node_ref_size)
3031
goto ignore_lazy_deleted_items;
3033
idx_still_on_key(ind, search_key, iref.ir_branch, &result.sr_item);
3036
ot->ot_curr_rec_id = result.sr_rec_id;
3037
ot->ot_curr_row_id = result.sr_row_id;
3038
ot->ot_ind_state = node->i_pos;
3040
/* Convert the pointer to a handle which can be used in later operations: */
3041
ASSERT_NS(!ot->ot_ind_rhandle);
3042
if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3044
/* Keep the node for next operations: */
3046
branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3047
memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3048
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3055
ot->ot_curr_rec_id = result.sr_rec_id;
3056
ot->ot_curr_row_id = result.sr_row_id;
3057
ot->ot_ind_state = result.sr_item;
3059
/* Convert the pointer to a handle which can be used in later operations: */
3060
ASSERT_NS(!ot->ot_ind_rhandle);
3061
if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3063
/* Keep the node for next operations: */
3065
branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3066
memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3067
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3072
XT_INDEX_UNLOCK(ind, ot);
3075
//idx_check_index(ot, ind, TRUE);
3076
//idx_check_on_key(ot);
3078
ASSERT_NS(iref.ir_xlock == 2);
3079
ASSERT_NS(iref.ir_updated == 2);
3083
XT_INDEX_UNLOCK(ind, ot);
3084
if (idx_out_of_memory_failure(ot))
3085
goto retry_after_oom;
3086
ASSERT_NS(iref.ir_xlock == 2);
3087
ASSERT_NS(iref.ir_updated == 2);
3091
xtPublic xtBool xt_idx_search_prev(XTOpenTablePtr ot, XTIndexPtr ind, register XTIdxSearchKeyPtr search_key)
3093
IdxBranchStackRec stack;
3094
xtIndexNodeID current;
3095
XTIndReferenceRec iref;
3096
XTIdxResultRec result;
3100
iref.ir_updated = 2;
3102
if (ot->ot_ind_rhandle) {
3103
xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, ot->ot_thread);
3104
ot->ot_ind_rhandle = NULL;
3107
//idx_check_index(ot, ind, TRUE);
3110
/* see the comment above in xt_idx_search */
3112
if (!ot->ot_thread->st_xact_data) {
3113
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
3119
#ifdef XT_TRACK_INDEX_UPDATES
3120
ot->ot_ind_changed = 0;
3122
idx_newstack(&stack);
3124
ot->ot_curr_rec_id = 0;
3125
ot->ot_curr_row_id = 0;
3127
XT_INDEX_READ_LOCK(ind, ot);
3129
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
3132
while (XT_NODE_ID(current)) {
3133
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3135
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &search_key->sk_key_value, &result);
3136
if (result.sr_found)
3137
/* If we have found the key in a node: */
3138
search_key->sk_on_key = TRUE;
3139
if (!result.sr_item.i_node_ref_size)
3141
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3142
if (!idx_push(&stack, current, &result.sr_item))
3144
current = result.sr_branch;
3147
if (result.sr_item.i_item_offset == 0) {
3148
IdxStackItemPtr node;
3151
/* We are at the start of a leaf node.
3152
* Go up the stack to find the start position of the next key.
3153
* If we find none, then we are the end of the index.
3155
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3156
while ((node = idx_pop(&stack))) {
3157
if (node->i_pos.i_item_offset > node->i_pos.i_node_ref_size) {
3158
if (!xt_ind_fetch(ot, ind, node->i_branch, XT_LOCK_READ, &iref))
3160
result.sr_item = node->i_pos;
3161
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3163
if (ind->mi_lazy_delete) {
3164
if (result.sr_row_id == (xtRowID) -1) {
3165
/* Go down to the bottom, in order to scan the leaf backwards: */
3166
current = node->i_branch;
3167
while (XT_NODE_ID(current)) {
3168
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3169
if (!idx_push(&stack, current, &result.sr_item))
3171
current = result.sr_branch;
3172
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3174
ind->mi_last_item(ot->ot_table, ind, iref.ir_branch, &result);
3175
if (!result.sr_item.i_node_ref_size)
3179
/* If the leaf empty we have to go up the stack again... */
3180
if (result.sr_item.i_total_size == 0)
3181
goto search_up_stack;
3183
goto scan_back_in_leaf;
3193
/* We must just step once to the left in this leaf node... */
3194
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3196
if (ind->mi_lazy_delete) {
3198
while (result.sr_row_id == (xtRowID) -1) {
3199
if (result.sr_item.i_item_offset == 0)
3200
goto search_up_stack;
3201
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3203
idx_still_on_key(ind, search_key, iref.ir_branch, &result.sr_item);
3207
ot->ot_curr_rec_id = result.sr_rec_id;
3208
ot->ot_curr_row_id = result.sr_row_id;
3209
ot->ot_ind_state = result.sr_item;
3211
/* Convert to handle for later operations: */
3212
ASSERT_NS(!ot->ot_ind_rhandle);
3213
if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3215
/* Keep a copy of the node for previous operations... */
3219
branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3220
memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3221
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3225
XT_INDEX_UNLOCK(ind, ot);
3228
//idx_check_index(ot, ind, TRUE);
3229
//idx_check_on_key(ot);
3234
XT_INDEX_UNLOCK(ind, ot);
3235
if (idx_out_of_memory_failure(ot))
3236
goto retry_after_oom;
3241
* Copy the current index value to the record.
3243
xtPublic xtBool xt_idx_read(XTOpenTablePtr ot, XTIndexPtr ind, xtWord1 *rec_buf)
3248
//idx_check_on_key(ot);
3250
xt_ind_lock_handle(ot->ot_ind_rhandle);
3251
bitem = ot->ot_ind_rhandle->ih_branch->tb_data + ot->ot_ind_state.i_item_offset;
3252
myxt_create_row_from_key(ot, ind, bitem, ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE, rec_buf);
3253
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3257
xtPublic xtBool xt_idx_next(register XTOpenTablePtr ot, register XTIndexPtr ind, register XTIdxSearchKeyPtr search_key)
3259
XTIdxKeyValueRec key_value;
3260
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE];
3261
XTIdxResultRec result;
3262
IdxBranchStackRec stack;
3263
xtIndexNodeID current;
3264
XTIndReferenceRec iref;
3268
iref.ir_updated = 2;
3270
ASSERT_NS(ot->ot_ind_rhandle);
3271
xt_ind_lock_handle(ot->ot_ind_rhandle);
3272
result.sr_item = ot->ot_ind_state;
3273
if (!result.sr_item.i_node_ref_size &&
3274
result.sr_item.i_item_offset < result.sr_item.i_total_size &&
3275
ot->ot_ind_rhandle->ih_cache_reference) {
3276
XTIdxItemRec prev_item;
3278
key_value.sv_key = &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset];
3279
key_value.sv_length = result.sr_item.i_item_size - XT_RECORD_REF_SIZE;
3281
prev_item = result.sr_item;
3282
idx_next_branch_item(ot->ot_table, ind, ot->ot_ind_rhandle->ih_branch, &result);
3284
if (ind->mi_lazy_delete) {
3285
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
3286
if (result.sr_row_id != (xtRowID) -1)
3288
prev_item = result.sr_item;
3289
idx_next_branch_item(ot->ot_table, ind, ot->ot_ind_rhandle->ih_branch, &result);
3293
if (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
3295
idx_still_on_key(ind, search_key, ot->ot_ind_rhandle->ih_branch, &result.sr_item);
3296
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3297
goto checked_on_key;
3300
result.sr_item = prev_item;
3303
key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
3304
xt_get_record_ref(&ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size - XT_RECORD_REF_SIZE], &key_value.sv_rec_id, &key_value.sv_row_id);
3305
key_value.sv_key = key_buf;
3306
key_value.sv_length = result.sr_item.i_item_size - XT_RECORD_REF_SIZE;
3307
memcpy(key_buf, &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset], key_value.sv_length);
3308
xt_ind_release_handle(ot->ot_ind_rhandle, TRUE, ot->ot_thread);
3309
ot->ot_ind_rhandle = NULL;
3312
#ifdef XT_TRACK_INDEX_UPDATES
3313
ot->ot_ind_changed = 0;
3315
idx_newstack(&stack);
3317
XT_INDEX_READ_LOCK(ind, ot);
3319
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root))) {
3320
XT_INDEX_UNLOCK(ind, ot);
3324
while (XT_NODE_ID(current)) {
3325
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3327
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &key_value, &result);
3328
if (result.sr_item.i_node_ref_size) {
3329
if (result.sr_found) {
3330
/* If we have found the key in a node: */
3331
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3333
/* Go down to the bottom: */
3334
while (XT_NODE_ID(current)) {
3335
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3336
if (!idx_push(&stack, current, &result.sr_item))
3338
current = result.sr_branch;
3339
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3341
idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3342
if (!result.sr_item.i_node_ref_size)
3346
/* Is the leaf not empty, then we are done... */
3351
/* We have reached the leaf. */
3352
if (result.sr_found)
3353
/* If we have found the key in a leaf: */
3354
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3355
/* If we did not find the key (although we should have). Our
3356
* position is automatically the next one.
3360
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3361
if (!idx_push(&stack, current, &result.sr_item))
3363
current = result.sr_branch;
3366
if (ind->mi_lazy_delete) {
3367
ignore_lazy_deleted_items:
3368
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
3369
if (result.sr_row_id != (xtRowID) -1)
3371
idx_next_branch_item(NULL, ind, iref.ir_branch, &result);
3375
/* Check the current position in a leaf: */
3376
if (result.sr_item.i_item_offset == result.sr_item.i_total_size) {
3378
IdxStackItemPtr node;
3380
/* We are at the end of a leaf node.
3381
* Go up the stack to find the start poition of the next key.
3382
* If we find none, then we are the end of the index.
3384
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3385
while ((node = idx_pop(&stack))) {
3386
if (node->i_pos.i_item_offset < node->i_pos.i_total_size) {
3387
if (!xt_ind_fetch(ot, ind, node->i_branch, XT_LOCK_READ, &iref))
3389
result.sr_item = node->i_pos;
3390
xt_get_res_record_ref(&iref.ir_branch->tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size - XT_RECORD_REF_SIZE], &result);
3392
if (ind->mi_lazy_delete) {
3393
if (result.sr_row_id == (xtRowID) -1) {
3394
/* If this node pointer is lazy deleted, then
3395
* go down the next branch...
3397
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3399
/* Go down to the bottom: */
3400
current = node->i_branch;
3401
while (XT_NODE_ID(current)) {
3402
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3403
if (!idx_push(&stack, current, &result.sr_item))
3405
current = result.sr_branch;
3406
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3408
idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3409
if (!result.sr_item.i_node_ref_size)
3413
/* And scan the leaf... */
3414
goto ignore_lazy_deleted_items;
3418
goto unlock_check_on_key;
3424
search_key->sk_on_key = FALSE;
3425
ot->ot_curr_rec_id = 0;
3426
ot->ot_curr_row_id = 0;
3427
XT_INDEX_UNLOCK(ind, ot);
3431
unlock_check_on_key:
3433
ASSERT_NS(!ot->ot_ind_rhandle);
3434
if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3439
branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3440
memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3441
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3444
XT_INDEX_UNLOCK(ind, ot);
3447
if (search_key && search_key->sk_on_key) {
3448
/* GOTCHA: As a short-cut I was using a length compare
3449
* and a memcmp() here to check whether we as still on
3450
* the original search key.
3451
* This does not work because it does not take into account
3452
* trialing spaces (which are ignored in comparison).
3453
* So lengths can be different, but values still equal.
3455
* NOTE: We have to use the original search flags for
3458
xt_ind_lock_handle(ot->ot_ind_rhandle);
3459
search_key->sk_on_key = myxt_compare_key(ind, search_key->sk_key_value.sv_flags, search_key->sk_key_value.sv_length,
3460
search_key->sk_key_value.sv_key, &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset]) == 0;
3461
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3465
ot->ot_curr_rec_id = result.sr_rec_id;
3466
ot->ot_curr_row_id = result.sr_row_id;
3467
ot->ot_ind_state = result.sr_item;
3472
XT_INDEX_UNLOCK(ind, ot);
3473
if (idx_out_of_memory_failure(ot))
3474
goto retry_after_oom;
3478
xtPublic xtBool xt_idx_prev(register XTOpenTablePtr ot, register XTIndexPtr ind, register XTIdxSearchKeyPtr search_key)
3480
XTIdxKeyValueRec key_value;
3481
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE];
3482
XTIdxResultRec result;
3483
IdxBranchStackRec stack;
3484
xtIndexNodeID current;
3485
XTIndReferenceRec iref;
3486
IdxStackItemPtr node;
3490
iref.ir_updated = 2;
3492
ASSERT_NS(ot->ot_ind_rhandle);
3493
xt_ind_lock_handle(ot->ot_ind_rhandle);
3494
result.sr_item = ot->ot_ind_state;
3495
if (!result.sr_item.i_node_ref_size && result.sr_item.i_item_offset > 0) {
3496
key_value.sv_key = &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset];
3497
key_value.sv_length = result.sr_item.i_item_size - XT_RECORD_REF_SIZE;
3499
ind->mi_prev_item(ot->ot_table, ind, ot->ot_ind_rhandle->ih_branch, &result);
3501
if (ind->mi_lazy_delete) {
3502
while (result.sr_row_id == (xtRowID) -1) {
3503
if (result.sr_item.i_item_offset == 0)
3505
ind->mi_prev_item(ot->ot_table, ind, ot->ot_ind_rhandle->ih_branch, &result);
3509
idx_still_on_key(ind, search_key, ot->ot_ind_rhandle->ih_branch, &result.sr_item);
3511
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3512
goto checked_on_key;
3516
key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
3517
key_value.sv_rec_id = ot->ot_curr_rec_id;
3518
key_value.sv_row_id = 0;
3519
key_value.sv_key = key_buf;
3520
key_value.sv_length = result.sr_item.i_item_size - XT_RECORD_REF_SIZE;
3521
memcpy(key_buf, &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset], key_value.sv_length);
3522
xt_ind_release_handle(ot->ot_ind_rhandle, TRUE, ot->ot_thread);
3523
ot->ot_ind_rhandle = NULL;
3526
#ifdef XT_TRACK_INDEX_UPDATES
3527
ot->ot_ind_changed = 0;
3529
idx_newstack(&stack);
3531
XT_INDEX_READ_LOCK(ind, ot);
3533
if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root))) {
3534
XT_INDEX_UNLOCK(ind, ot);
3538
while (XT_NODE_ID(current)) {
3539
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3541
ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &key_value, &result);
3542
if (result.sr_item.i_node_ref_size) {
3543
if (result.sr_found) {
3544
/* If we have found the key in a node: */
3547
/* Go down to the bottom: */
3548
while (XT_NODE_ID(current)) {
3549
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3550
if (!idx_push(&stack, current, &result.sr_item))
3552
current = result.sr_branch;
3553
if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3555
ind->mi_last_item(ot->ot_table, ind, iref.ir_branch, &result);
3556
if (!result.sr_item.i_node_ref_size)
3560
/* If the leaf empty we have to go up the stack again... */
3561
if (result.sr_item.i_total_size == 0)
3564
if (ind->mi_lazy_delete) {
3565
while (result.sr_row_id == (xtRowID) -1) {
3566
if (result.sr_item.i_item_offset == 0)
3567
goto search_up_stack;
3568
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3572
goto unlock_check_on_key;
3576
/* We have reached the leaf.
3577
* Whether we found the key or not, we have
3578
* to move one to the left.
3580
if (result.sr_item.i_item_offset == 0)
3582
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3584
if (ind->mi_lazy_delete) {
3585
while (result.sr_row_id == (xtRowID) -1) {
3586
if (result.sr_item.i_item_offset == 0)
3587
goto search_up_stack;
3588
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3592
goto unlock_check_on_key;
3594
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3595
if (!idx_push(&stack, current, &result.sr_item))
3597
current = result.sr_branch;
3601
/* We are at the start of a leaf node.
3602
* Go up the stack to find the start poition of the next key.
3603
* If we find none, then we are the end of the index.
3605
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3606
while ((node = idx_pop(&stack))) {
3607
if (node->i_pos.i_item_offset > node->i_pos.i_node_ref_size) {
3608
if (!xt_ind_fetch(ot, ind, node->i_branch, XT_LOCK_READ, &iref))
3610
result.sr_item = node->i_pos;
3611
ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3613
if (ind->mi_lazy_delete) {
3614
if (result.sr_row_id == (xtRowID) -1) {
3615
current = node->i_branch;
3616
goto search_down_stack;
3620
goto unlock_check_on_key;
3626
search_key->sk_on_key = FALSE;
3627
ot->ot_curr_rec_id = 0;
3628
ot->ot_curr_row_id = 0;
3630
XT_INDEX_UNLOCK(ind, ot);
3633
unlock_check_on_key:
3634
ASSERT_NS(!ot->ot_ind_rhandle);
3635
if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3640
branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3641
memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3642
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3645
XT_INDEX_UNLOCK(ind, ot);
3648
if (search_key && search_key->sk_on_key) {
3649
xt_ind_lock_handle(ot->ot_ind_rhandle);
3650
search_key->sk_on_key = myxt_compare_key(ind, search_key->sk_key_value.sv_flags, search_key->sk_key_value.sv_length,
3651
search_key->sk_key_value.sv_key, &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset]) == 0;
3652
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3656
ot->ot_curr_rec_id = result.sr_rec_id;
3657
ot->ot_curr_row_id = result.sr_row_id;
3658
ot->ot_ind_state = result.sr_item;
3662
XT_INDEX_UNLOCK(ind, ot);
3663
if (idx_out_of_memory_failure(ot))
3664
goto retry_after_oom;
3668
/* Return TRUE if the record matches the current index search! */
3669
xtPublic xtBool xt_idx_match_search(register XTOpenTablePtr XT_UNUSED(ot), register XTIndexPtr ind, register XTIdxSearchKeyPtr search_key, xtWord1 *buf, int mode)
3672
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE];
3674
myxt_create_key_from_row(ind, key_buf, (xtWord1 *) buf, NULL);
3675
r = myxt_compare_key(ind, search_key->sk_key_value.sv_flags, search_key->sk_key_value.sv_length, search_key->sk_key_value.sv_key, key_buf);
3677
case XT_S_MODE_MATCH:
3679
case XT_S_MODE_NEXT:
3681
case XT_S_MODE_PREV:
3687
static void idx_set_index_selectivity(XTOpenTablePtr ot, XTIndexPtr ind, XTThreadPtr thread)
3689
static const xtRecordID MAX_RECORDS = 100;
3691
XTIdxSearchKeyRec search_key;
3692
XTIndexSegPtr key_seg;
3693
u_int select_count[2] = {0, 0};
3694
xtWord1 key_buf[XT_INDEX_MAX_KEY_SIZE];
3696
xtWord1 *next_key_buf;
3701
/* these 2 vars are used to check the overlapping if we have < 200 records */
3702
xtRecordID last_rec = 0; /* last record accounted in this iteration */
3703
xtRecordID last_iter_rec = 0; /* last record accounted in the previous iteration */
3705
xtBool (* xt_idx_iterator[2])(
3706
register struct XTOpenTable *ot, register struct XTIndex *ind, register XTIdxSearchKeyPtr search_key) = {
3712
xtBool (* xt_idx_begin[2])(
3713
struct XTOpenTable *ot, struct XTIndex *ind, register XTIdxSearchKeyPtr search_key) = {
3719
ind->mi_select_total = 0;
3720
key_seg = ind->mi_seg;
3721
for (i=0; i < ind->mi_seg_count; key_seg++, i++) {
3722
key_seg->is_selectivity = 1;
3723
key_seg->is_recs_in_range = 1;
3726
for (j=0; j < 2; j++) {
3727
xt_idx_prep_key(ind, &search_key, j == 0 ? XT_SEARCH_FIRST_FLAG : XT_SEARCH_AFTER_LAST_FLAG, NULL, 0);
3728
if (!(xt_idx_begin[j])(ot, ind, &search_key))
3731
/* Initialize the buffer with the first index valid index entry: */
3732
while (!select_count[j] && ot->ot_curr_rec_id != last_iter_rec) {
3733
if (ot->ot_curr_row_id) {
3735
last_rec = ot->ot_curr_rec_id;
3737
key_len = ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE;
3738
xt_ind_lock_handle(ot->ot_ind_rhandle);
3739
memcpy(key_buf, ot->ot_ind_rhandle->ih_branch->tb_data + ot->ot_ind_state.i_item_offset, key_len);
3740
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3742
if (!(xt_idx_iterator[j])(ot, ind, &search_key))
3746
while (select_count[j] < MAX_RECORDS && ot->ot_curr_rec_id != last_iter_rec) {
3747
/* Check if the index entry is committed: */
3748
if (ot->ot_curr_row_id) {
3749
xt_ind_lock_handle(ot->ot_ind_rhandle);
3751
last_rec = ot->ot_curr_rec_id;
3753
next_key_len = ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE;
3754
next_key_buf = ot->ot_ind_rhandle->ih_branch->tb_data + ot->ot_ind_state.i_item_offset;
3758
key_seg = ind->mi_seg;
3759
for (i=0; i < ind->mi_seg_count; key_seg++, i++) {
3760
curr_len += myxt_key_seg_length(key_seg, curr_len, key_buf);
3761
if (!diff && myxt_compare_key(ind, 0, curr_len, key_buf, next_key_buf) != 0)
3764
key_seg->is_selectivity++;
3767
/* Store the key for the next comparison: */
3768
key_len = next_key_len;
3769
memcpy(key_buf, next_key_buf, key_len);
3770
xt_ind_unlock_handle(ot->ot_ind_rhandle);
3773
if (!(xt_idx_iterator[j])(ot, ind, &search_key))
3777
last_iter_rec = last_rec;
3779
if (ot->ot_ind_rhandle) {
3780
xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, thread);
3781
ot->ot_ind_rhandle = NULL;
3787
select_total = select_count[0] + select_count[1];
3791
ind->mi_select_total = select_total;
3792
key_seg = ind->mi_seg;
3793
for (i=0; i < ind->mi_seg_count; key_seg++, i++) {
3794
recs = (u_int) ((double) select_total / (double) key_seg->is_selectivity + (double) 0.5);
3795
key_seg->is_recs_in_range = recs ? recs : 1;
3801
if (ot->ot_ind_rhandle) {
3802
xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, thread);
3803
ot->ot_ind_rhandle = NULL;
3807
xt_tab_disable_index(ot->ot_table, XT_INDEX_CORRUPTED);
3808
xt_log_and_clear_exception_ns();
3812
xtPublic void xt_ind_set_index_selectivity(XTOpenTablePtr ot, XTThreadPtr thread)
3814
XTTableHPtr tab = ot->ot_table;
3820
xt_lock_mutex_ns(&tab->tab_ind_stat_lock);
3821
if (tab->tab_ind_stat_calc_time < now) {
3822
if (!tab->tab_dic.dic_disable_index) {
3823
for (i=0, ind=tab->tab_dic.dic_keys; i<tab->tab_dic.dic_key_count; i++, ind++)
3824
idx_set_index_selectivity(ot, *ind, thread);
3826
tab->tab_ind_stat_calc_time = time(NULL);
3828
xt_unlock_mutex_ns(&tab->tab_ind_stat_lock);
3832
* -----------------------------------------------------------------------
3837
static void idx_check_on_key(XTOpenTablePtr ot)
3839
u_int offs = ot->ot_ind_state.i_item_offset + ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE;
3843
if (ot->ot_curr_rec_id && ot->ot_ind_state.i_item_offset < ot->ot_ind_state.i_total_size) {
3844
xt_get_record_ref(&ot->ot_ind_rbuf.tb_data[offs], &rec_id, &row_id);
3846
ASSERT_NS(rec_id == ot->ot_curr_rec_id);
3851
static void idx_check_space(int
3858
for (int i=0; i<depth; i++)
3865
//#define FILL_COMPRESS_BLOCKS
3867
#ifdef FILL_COMPRESS_BLOCKS
3868
#define COMPRESS_BLOCK_SIZE (1024 * 128)
3870
#define COMPRESS_BLOCK_SIZE 16384
3887
unsigned char out[COMPRESS_BLOCK_SIZE];
3888
unsigned char uncomp[COMPRESS_BLOCK_SIZE];
3889
xtWord1 precomp[COMPRESS_BLOCK_SIZE+16000];
3891
u_int idx_precompress(XTIndexPtr ind, u_int node_ref_size, u_int item_size, u_int insize, xtWord1 *in_data, xtWord1 *out_data)
3893
xtWord1 *prev_item = NULL;
3894
xtWord1 *in_ptr = in_data;
3895
xtWord1 *out_ptr = out_data;
3899
if (insize >= node_ref_size) {
3900
memcpy(out_ptr, in_ptr, node_ref_size);
3901
insize -= node_ref_size;
3902
out_size += node_ref_size;
3903
in_ptr += node_ref_size;
3904
out_ptr += node_ref_size;
3907
while (insize >= item_size + node_ref_size) {
3910
while (same_size < item_size + node_ref_size && *prev_item == *in_ptr) {
3915
ASSERT_NS(same_size < 256);
3916
*out_ptr = (xtWord1) same_size;
3919
same_size = item_size + node_ref_size - same_size;
3920
memcpy(out_ptr, in_ptr, same_size);
3921
out_size += same_size;
3922
out_ptr += same_size;
3923
in_ptr += same_size;
3924
prev_item += same_size;
3928
memcpy(out_ptr, in_ptr, item_size + node_ref_size);
3929
out_size += item_size + node_ref_size;
3930
out_ptr += item_size + node_ref_size;
3931
in_ptr += item_size + node_ref_size;
3933
insize -= (item_size + node_ref_size);
3938
u_int idx_compress(u_int insize, xtWord1 *in_data, u_int outsize, xtWord1 *out_data)
3943
strm.zalloc = Z_NULL;
3944
strm.zfree = Z_NULL;
3945
strm.opaque = Z_NULL;
3947
strm.next_in = Z_NULL;
3948
ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION);
3949
strm.avail_out = outsize;
3950
strm.next_out = out_data;
3951
strm.avail_in = insize;
3952
strm.next_in = in_data;
3953
ret = deflate(&strm, Z_FINISH);
3955
return outsize - strm.avail_out;
3961
memset(&strm, 0, sizeof(strm));
3963
ret = BZ2_bzCompressInit(&strm, 1, 0, 0);
3964
strm.avail_out = outsize;
3965
strm.next_out = (char *) out_data;
3966
strm.avail_in = insize;
3967
strm.next_in = (char *) in_data;
3968
ret = BZ2_bzCompress(&strm, BZ_FINISH);
3970
BZ2_bzCompressEnd(&strm);
3971
return outsize - strm.avail_out;
3975
u_int idx_decompress(u_int insize, xtWord1 *in_data, u_int outsize, xtWord1 *out_data)
3980
strm.zalloc = Z_NULL;
3981
strm.zfree = Z_NULL;
3982
strm.opaque = Z_NULL;
3984
strm.next_in = Z_NULL;
3985
ret = inflateInit(&strm);
3986
strm.avail_out = outsize;
3987
strm.next_out = out_data;
3988
strm.avail_in = insize;
3989
strm.next_in = in_data;
3990
ret = inflate(&strm, Z_FINISH);
3992
return outsize - strm.avail_out;
3998
memset(&strm, 0, sizeof(strm));
4000
ret = BZ2_bzDecompressInit(&strm, 0, 0);
4001
strm.avail_out = outsize;
4002
strm.next_out = (char *) out_data;
4003
strm.avail_in = insize;
4004
strm.next_in = (char *) in_data;
4005
ret = BZ2_bzDecompress(&strm);
4007
BZ2_bzDecompressEnd(&strm);
4008
return outsize - strm.avail_out;
4011
#endif // DO_COMP_TEST
4013
static u_int idx_check_node(XTOpenTablePtr ot, XTIndexPtr ind, int depth, xtIndexNodeID node)
4015
XTIdxResultRec result;
4016
u_int block_count = 1;
4017
XTIndReferenceRec iref;
4021
unsigned uncomp_size;
4028
iref.ir_updated = 2;
4030
ASSERT_NS(XT_NODE_ID(node) <= XT_NODE_ID(ot->ot_table->tab_ind_eof));
4032
now = xt_trace_clock();
4034
/* A deadlock can occur when taking a read lock
4035
* because the XT_IPAGE_WRITE_TRY_LOCK(&block->cb_lock, ot->ot_thread->t_id)
4036
* only takes into account WRITE locks.
4037
* So, if we hold a READ lock on a page, and ind_free_block() trys to
4038
* free the block, it hangs on its own read lock!
4040
* So we change from READ lock to a WRITE lock.
4041
* If too restrictive then locks need to handle TRY on a
4042
* read lock as well.
4044
* #3 0x00e576b6 in xt_yield at thread_xt.cc:1351
4045
* #4 0x00e7218e in xt_spinxslock_xlock at lock_xt.cc:1467
4046
* #5 0x00dee1a9 in ind_free_block at cache_xt.cc:901
4047
* #6 0x00dee500 in ind_cac_free_lru_blocks at cache_xt.cc:1054
4048
* #7 0x00dee88c in ind_cac_fetch at cache_xt.cc:1151
4049
* #8 0x00def6d4 in xt_ind_fetch at cache_xt.cc:1480
4050
* #9 0x00e1ce2e in idx_check_node at index_xt.cc:3996
4051
* #10 0x00e1cf2b in idx_check_node at index_xt.cc:4106
4052
* #11 0x00e1cf2b in idx_check_node at index_xt.cc:4106
4053
* #12 0x00e1cfdc in idx_check_index at index_xt.cc:4130
4054
* #13 0x00e1d11c in xt_check_indices at index_xt.cc:4181
4055
* #14 0x00e4aa82 in xt_check_table at table_xt.cc:2363
4057
if (!xt_ind_fetch(ot, ind, node, XT_LOCK_WRITE, &iref))
4060
time = xt_trace_clock() - now;
4064
idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
4065
ASSERT_NS(result.sr_item.i_total_size + offsetof(XTIdxBranchDRec, tb_data) <= XT_INDEX_PAGE_SIZE);
4068
u_int size = result.sr_item.i_total_size;
4069
xtWord1 *data = iref.ir_branch->tb_data;
4072
size = idx_precompress(ind, result.sr_item.i_node_ref_size, result.sr_item.i_item_size, size, data, precomp);
4073
if (size > result.sr_item.i_total_size)
4074
size = result.sr_item.i_total_size;
4080
usage_total += result.sr_item.i_total_size;
4082
#ifdef FILL_COMPRESS_BLOCKS
4083
if (fill_size + size > COMPRESS_BLOCK_SIZE) {
4084
now = xt_trace_clock();
4085
comp_size = idx_compress(fill_size, precomp, COMPRESS_BLOCK_SIZE, out);
4086
time = xt_trace_clock() - now;
4089
zlib_total += comp_size;
4090
filled_size += fill_size;
4092
now = xt_trace_clock();
4093
uncomp_size = idx_decompress(comp_size, out, COMPRESS_BLOCK_SIZE, uncomp);
4094
time = xt_trace_clock() - now;
4095
uncomp_time += time;
4097
if (uncomp_size != fill_size)
4102
memcpy(precomp + fill_size, data, size);
4105
now = xt_trace_clock();
4106
comp_size = idx_compress(size, data, COMPRESS_BLOCK_SIZE, out);
4107
time = xt_trace_clock() - now;
4109
zlib_total += comp_size;
4111
now = xt_trace_clock();
4112
uncomp_size = idx_decompress(comp_size, out, COMPRESS_BLOCK_SIZE, uncomp);
4113
time = xt_trace_clock() - now;
4114
uncomp_time += time;
4115
if (uncomp_size != size)
4119
if (comp_size <= 1024)
4121
else if (comp_size <= 2048)
4123
else if (comp_size <= 4096)
4125
else if (comp_size <= 8192)
4130
#endif // DO_COMP_TEST
4132
if (result.sr_item.i_node_ref_size) {
4133
idx_check_space(depth);
4135
printf("%04d -->\n", (int) XT_NODE_ID(result.sr_branch));
4137
#ifdef TRACK_ACTIVITY
4138
track_block_exists(result.sr_branch);
4140
block_count += idx_check_node(ot, ind, depth+1, result.sr_branch);
4143
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
4144
#ifdef CHECK_PRINTS_RECORD_REFERENCES
4145
idx_check_space(depth);
4146
if (result.sr_item.i_item_size == 12) {
4147
/* Assume this is a NOT-NULL INT!: */
4148
xtWord4 val = XT_GET_DISK_4(&iref.ir_branch->tb_data[result.sr_item.i_item_offset]);
4150
printf("(%6d) ", (int) val);
4154
printf("rec=%d row=%d ", (int) result.sr_rec_id, (int) result.sr_row_id);
4158
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
4159
if (result.sr_item.i_node_ref_size) {
4160
idx_check_space(depth);
4162
printf("%04d -->\n", (int) XT_NODE_ID(result.sr_branch));
4164
#ifdef TRACK_ACTIVITY
4165
track_block_exists(result.sr_branch);
4167
block_count += idx_check_node(ot, ind, depth+1, result.sr_branch);
4171
xt_ind_release(ot, ind, XT_UNLOCK_WRITE, &iref);
4175
static u_int idx_check_index(XTOpenTablePtr ot, XTIndexPtr ind, xtBool with_lock)
4177
xtIndexNodeID current;
4178
u_int block_count = 0;
4182
XT_INDEX_WRITE_LOCK(ind, ot);
4185
printf("INDEX (%d) %04d ---------------------------------------\n", (int) ind->mi_index_no, (int) XT_NODE_ID(ind->mi_root));
4187
if ((XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root))) {
4188
#ifdef TRACK_ACTIVITY
4189
track_block_exists(ind->mi_root);
4191
block_count = idx_check_node(ot, ind, 0, current);
4194
if (ind->mi_free_list && ind->mi_free_list->fl_free_count) {
4196
printf("INDEX (%d) FREE ---------------------------------------", (int) ind->mi_index_no);
4198
ASSERT_NS(ind->mi_free_list->fl_start == 0);
4199
for (i=0; i<ind->mi_free_list->fl_free_count; i++) {
4205
#ifdef TRACK_ACTIVITY
4206
track_block_exists(ind->mi_free_list->fl_page_id[i]);
4209
printf("%2d ", (int) XT_NODE_ID(ind->mi_free_list->fl_page_id[i]));
4219
XT_INDEX_UNLOCK(ind, ot);
4224
xtPublic void xt_check_indices(XTOpenTablePtr ot)
4226
register XTTableHPtr tab = ot->ot_table;
4228
xtIndexNodeID current;
4229
XTIndFreeBlockRec free_block;
4230
u_int ind_count, block_count = 0;
4231
u_int free_count = 0;
4234
xt_lock_mutex_ns(&tab->tab_ind_flush_lock);
4235
printf("CHECK INDICES %s ==============================\n", tab->tab_name->ps_path);
4236
#ifdef TRACK_ACTIVITY
4237
track_reset_missing();
4240
ind = tab->tab_dic.dic_keys;
4241
for (u_int k=0; k<tab->tab_dic.dic_key_count; k++, ind++) {
4242
ind_count = idx_check_index(ot, *ind, TRUE);
4243
block_count += ind_count;
4249
#ifdef FILL_COMPRESS_BLOCKS
4250
if (fill_size > 0) {
4252
unsigned uncomp_size;
4256
now = xt_trace_clock();
4257
comp_size = idx_compress(fill_size, precomp, COMPRESS_BLOCK_SIZE, out);
4258
time = xt_trace_clock() - now;
4260
zlib_total += comp_size;
4261
filled_size += fill_size;
4263
now = xt_trace_clock();
4264
uncomp_size = idx_decompress(comp_size, out, COMPRESS_BLOCK_SIZE, uncomp);
4265
time = xt_trace_clock() - now;
4266
uncomp_time += time;
4268
if (filled_size != usage_total)
4272
printf("Total blocks = %d\n", blocks);
4273
printf("zlib <= 1024 = %d\n", zlib_1024);
4274
printf("zlib <= 2048 = %d\n", zlib_2048);
4275
printf("zlib <= 4096 = %d\n", zlib_4096);
4276
printf("zlib <= 8192 = %d\n", zlib_8192);
4277
printf("zlib <= 16384 = %d\n", zlib_16384);
4278
printf("zlib average size = %.2f\n", (double) zlib_total / (double) blocks);
4279
printf("zlib average time = %.2f\n", (double) zlib_time / (double) blocks);
4280
printf("read average time = %.2f\n", (double) read_time / (double) blocks);
4281
printf("uncompress time = %.2f\n", (double) uncomp_time / (double) blocks);
4282
block_total = (zlib_1024 + zlib_2048) * 8192;
4283
block_total += zlib_4096 * 8192;
4284
block_total += zlib_8192 * 8192;
4285
block_total += zlib_16384 * 16384;
4286
printf("block total = %d\n", block_total);
4287
printf("block %% compress = %.2f\n", ((double) block_total * (double) 100) / ((double) blocks * (double) 16384));
4288
printf("Total size = %d\n", blocks * 16384);
4289
printf("total before zlib = %d\n", usage_total);
4290
printf("total after zlib = %d\n", zlib_total);
4291
printf("zlib %% compress = %.2f\n", ((double) zlib_total * (double) 100) / (double) usage_total);
4292
printf("total %% compress = %.2f\n", ((double) zlib_total * (double) 100) / (double) (blocks * 16384));
4295
xt_lock_mutex_ns(&tab->tab_ind_lock);
4297
printf("\nFREE: ---------------------------------------\n");
4299
if (tab->tab_ind_free_list) {
4300
XTIndFreeListPtr ptr;
4302
ptr = tab->tab_ind_free_list;
4305
printf("Memory List:");
4308
for (j=ptr->fl_start; j<ptr->fl_free_count; j++, i++) {
4314
#ifdef TRACK_ACTIVITY
4315
track_block_exists(ptr->fl_page_id[j]);
4318
printf("%2d ", (int) XT_NODE_ID(ptr->fl_page_id[j]));
4325
ptr = ptr->fl_next_list;
4329
current = tab->tab_ind_free;
4330
if (XT_NODE_ID(current)) {
4333
printf("Disk List:");
4335
while (XT_NODE_ID(current)) {
4341
#ifdef TRACK_ACTIVITY
4342
track_block_exists(current);
4345
printf("%d ", (int) XT_NODE_ID(current));
4347
if (!xt_ind_read_bytes(ot, *ind, current, sizeof(XTIndFreeBlockRec), (xtWord1 *) &free_block)) {
4348
xt_log_and_clear_exception_ns();
4351
XT_NODE_ID(current) = (xtIndexNodeID) XT_GET_DISK_8(free_block.if_next_block_8);
4360
printf("\n-----------------------------\n");
4361
printf("used blocks %d + free blocks %d = %d\n", block_count, free_count, block_count + free_count);
4362
printf("EOF = %"PRIu64", total blocks = %d\n", (xtWord8) xt_ind_node_to_offset(tab, tab->tab_ind_eof), (int) (XT_NODE_ID(tab->tab_ind_eof) - 1));
4363
printf("-----------------------------\n");
4365
xt_unlock_mutex_ns(&tab->tab_ind_lock);
4366
#ifdef TRACK_ACTIVITY
4367
track_dump_missing(tab->tab_ind_eof);
4368
printf("===================================================\n");
4369
track_dump_all((u_int) (XT_NODE_ID(tab->tab_ind_eof) - 1));
4371
printf("===================================================\n");
4372
xt_unlock_mutex_ns(&tab->tab_ind_flush_lock);
4376
* -----------------------------------------------------------------------
4380
static void idx_load_node(XTThreadPtr self, XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID node)
4382
XTIdxResultRec result;
4383
XTIndReferenceRec iref;
4385
ASSERT_NS(XT_NODE_ID(node) <= XT_NODE_ID(ot->ot_table->tab_ind_eof));
4386
if (!xt_ind_fetch(ot, ind, node, XT_LOCK_READ, &iref))
4389
idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
4390
if (result.sr_item.i_node_ref_size)
4391
idx_load_node(self, ot, ind, result.sr_branch);
4392
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
4393
idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
4394
if (result.sr_item.i_node_ref_size)
4395
idx_load_node(self, ot, ind, result.sr_branch);
4398
xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
4401
xtPublic void xt_load_indices(XTThreadPtr self, XTOpenTablePtr ot)
4403
register XTTableHPtr tab = ot->ot_table;
4404
XTIndexPtr *ind_ptr;
4406
xtIndexNodeID current;
4408
xt_lock_mutex(self, &tab->tab_ind_flush_lock);
4409
pushr_(xt_unlock_mutex, &tab->tab_ind_flush_lock);
4411
ind_ptr = tab->tab_dic.dic_keys;
4412
for (u_int k=0; k<tab->tab_dic.dic_key_count; k++, ind_ptr++) {
4414
XT_INDEX_WRITE_LOCK(ind, ot);
4415
if ((XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
4416
idx_load_node(self, ot, ind, current);
4417
XT_INDEX_UNLOCK(ind, ot);
4420
freer_(); // xt_unlock_mutex(&tab->tab_ind_flush_lock)
4424
* -----------------------------------------------------------------------
4425
* Count the number of deleted entries in a node:
4429
* {LAZY-DEL-INDEX-ITEMS}
4431
* Use this function to count the number of deleted items
4432
* in a node when it is loaded.
4434
* The count helps us decide of the node should be "packed".
4436
xtPublic void xt_ind_count_deleted_items(XTTableHPtr tab, XTIndexPtr ind, XTIndBlockPtr block)
4438
XTIdxResultRec result;
4440
xtWord2 branch_size;
4442
branch_size = XT_GET_DISK_2(((XTIdxBranchDPtr) block->cb_data)->tb_size_2);
4444
/* This is possible when reading free pages. */
4445
if (XT_GET_INDEX_BLOCK_LEN(branch_size) < 2 || XT_GET_INDEX_BLOCK_LEN(branch_size) > XT_INDEX_PAGE_SIZE)
4448
idx_first_branch_item(tab, ind, (XTIdxBranchDPtr) block->cb_data, &result);
4449
while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
4450
if (result.sr_row_id == (xtRowID) -1)
4452
idx_next_branch_item(tab, ind, (XTIdxBranchDPtr) block->cb_data, &result);
4454
block->cp_del_count = del_count;
4458
* -----------------------------------------------------------------------
4462
xtBool XTIndDirtyList::dl_add_block(XTIndBlockPtr block)
4464
XTIndDirtyBlocksPtr blocks;
4466
blocks = dl_block_lists;
4467
if (dl_list_usage == XT_DIRTY_BLOCK_LIST_SIZE || !blocks) {
4468
if (!(blocks = (XTIndDirtyBlocksPtr) xt_malloc_ns(sizeof(XTIndDirtyBlocksRec))))
4471
blocks->db_next = dl_block_lists;
4472
dl_block_lists = blocks;
4474
blocks->db_blocks[dl_list_usage] = block;
4480
static int idx_compare_blocks(const void *a, const void *b)
4482
XTIndBlockPtr b_a = *((XTIndBlockPtr *) a);
4483
XTIndBlockPtr b_b = *((XTIndBlockPtr *) b);
4485
if (b_a->cb_address == b_b->cb_address)
4487
if (b_a->cb_address < b_b->cb_address)
4492
void XTIndDirtyList::dl_sort_blocks()
4494
XTIndDirtyBlocksPtr blocks;
4497
size = dl_list_usage;
4498
blocks = dl_block_lists;
4500
qsort(blocks->db_blocks, size, sizeof(XTIndBlockPtr), idx_compare_blocks);
4501
blocks = blocks->db_next;
4502
size = XT_DIRTY_BLOCK_LIST_SIZE;
4506
void XTIndDirtyList::dl_free_all()
4508
XTIndDirtyBlocksPtr blocks, n_blocks;
4510
blocks = dl_block_lists;
4511
dl_block_lists = NULL;
4512
dl_total_blocks = 0;
4515
n_blocks = blocks->db_next;
4522
* -----------------------------------------------------------------------
4523
* Index consistent flush
4526
xtBool XTFlushIndexTask::tk_task(XTThreadPtr thread)
4530
fit_dirty_blocks = 0;
4531
fit_blocks_flushed = 0;
4533
/* See {TASK-TABLE-GONE} */
4534
if (!(xt_db_open_pool_table_ns(&ot, fit_table->tab_db, fit_table->tab_id)))
4538
/* Can happen if the table has been dropped: */
4539
if (thread->t_exception.e_xt_err)
4540
xt_log_and_clear_exception(thread);
4541
xt_logf(XT_NT_WARNING, "Checkpoint skipping table (ID) %lu: table was not found\n", (u_long) fit_table->tab_id);
4542
xt_checkpoint_set_flush_state(fit_table->tab_db, fit_table->tab_id, XT_CPT_STATE_DONE_ALL);
4546
if (ot->ot_table != fit_table) {
4547
/* Can happen if the table has been renamed: */
4548
if (thread->t_exception.e_xt_err)
4549
xt_log_and_clear_exception(thread);
4550
xt_logf(XT_NT_WARNING, "Checkpoint skipping table (ID) %lu: table has been renamed\n", (u_long) fit_table->tab_id);
4551
xt_checkpoint_set_flush_state(fit_table->tab_db, fit_table->tab_id, XT_CPT_STATE_DONE_ALL);
4555
if (!xt_flush_indices(ot, NULL, FALSE, this)) {
4556
xt_db_return_table_to_pool_ns(ot);
4561
xt_db_return_table_to_pool_ns(ot);
4565
void XTFlushIndexTask::tk_reference()
4567
xt_heap_reference_ns(fit_table);
4570
void XTFlushIndexTask::tk_release()
4572
xt_heap_release_ns(fit_table);
4576
* Set notify_before_write to TRUE if the caller requires
4577
* notification before the index file is written.
4579
* This is used if the index is flushed due to lock of index cache.
4581
xtPublic xtBool xt_async_flush_indices(XTTableHPtr tab, xtBool notify_complete, xtBool notify_before_write, XTThreadPtr thread)
4584
return xt_run_async_task(tab->tab_ind_flush_task, notify_complete, notify_before_write, thread, tab->tab_db);
4587
#if defined(PRINT_IND_FLUSH_STATS) || defined(TRACE_FLUSH_TIMES)
4589
static char *idx_format(char *buffer, double v)
4592
sprintf(buffer, "%9.2f", v);
4593
if (strcmp(buffer, " nan") == 0)
4594
strcpy(buffer, " ");
4597
strcpy(buffer, " ");
4601
static char *idx_format_mb(char *buffer, double v)
4604
sprintf(buffer, "%7.3f", v / (double) 1024);
4605
if (strcmp(buffer, " nan") == 0)
4606
strcpy(buffer, " ");
4609
strcpy(buffer, " ");
4614
#ifdef TRACE_FLUSH_TIMES
4616
#define ILOG_FLUSH 1
4617
#define INDEX_FLUSH 2
4626
static void idx_print(char *msg, XTThreadPtr thread, struct idxstats *st, xtWord8 *now, int flush)
4630
double dilogw, didxw;
4636
then = xt_trace_clock();
4638
ilogw = (double) (thread->st_statistics.st_ilog.ts_write - st->i_log_write) / (double) 1024;
4639
dilogw = ((double) ilogw * (double) 1000000) / (double) t;
4640
idxw = (double) (thread->st_statistics.st_ind.ts_write - st->idx_write) / (double) 1024;
4641
didxw = ((double) idxw * (double) 1000000) / (double) t;
4643
printf("%26s | TIME: %7d ", msg, (int) t);
4644
printf("ILOG: %s - %s INDX: %s - %s\n",
4645
idx_format_mb(buf1, dilogw), idx_format(buf2, ilogw),
4646
idx_format_mb(buf3, didxw), idx_format(buf4, idxw));
4647
st->i_log_write = thread->st_statistics.st_ilog.ts_write;
4648
st->idx_write = thread->st_statistics.st_ind.ts_write;
4652
ilogw = (double) (thread->st_statistics.st_ilog.ts_write - st->i_log_flush) / (double) 1024;
4653
dilogw = ((double) ilogw * (double) 1000000) / (double) t;
4654
printf("%26s | TIME: %7s ", " ", " ");
4655
printf("ILOG: %s - %s INDX: %s - %s\n",
4656
idx_format_mb(buf1, dilogw), idx_format(buf2, ilogw),
4657
idx_format_mb(buf3, 0.0), idx_format(buf4, 0.0));
4658
st->i_log_flush = thread->st_statistics.st_ilog.ts_write;
4661
idxw = (double) (thread->st_statistics.st_ind.ts_write - st->idx_flush) / (double) 1024;
4662
didxw = ((double) idxw * (double) 1000000) / (double) t;
4663
printf("%26s | TIME: %7s ", " ", " ");
4664
printf("ILOG: %s - %s INDX: %s - %s\n",
4665
idx_format_mb(buf1, 0.0), idx_format(buf2, 0.0),
4666
idx_format_mb(buf3, didxw), idx_format(buf4, idxw));
4667
st->idx_flush = thread->st_statistics.st_ind.ts_write;
4671
*now = xt_trace_clock();
4674
#define TRACE_FLUSH(a, b, c, d, e) idx_print(a, b, c, d, e)
4676
#else // TRACE_FLUSH_TIMES
4678
#define TRACE_FLUSH(a, b, c, d, e)
4680
#endif // TRACE_FLUSH_TIMES
4682
/* Flush the indexes of a table.
4683
* If a ft is given, then this means this is an asynchronous flush.
4685
xtPublic xtBool xt_flush_indices(XTOpenTablePtr ot, off_t *bytes_flushed, xtBool have_table_lock, XTFlushIndexTask *fit)
4687
register XTTableHPtr tab = ot->ot_table;
4692
XTIndBlockPtr block, fblock;
4694
xtIndexNodeID ind_free;
4695
xtBool block_on_free_list = FALSE;
4696
xtIndexNodeID last_address, next_address;
4697
XTIndFreeListPtr list_ptr;
4699
XTIndDirtyListItorRec it;
4700
//u_int dirty_count;
4701
#ifdef TRACE_FLUSH_INDEX
4705
#ifdef TRACE_FLUSH_TIMES
4706
XTThreadPtr thread = ot->ot_thread;
4709
st.i_log_flush = thread->st_statistics.st_ilog.ts_write;
4710
st.i_log_write = thread->st_statistics.st_ilog.ts_write;
4711
st.idx_flush = thread->st_statistics.st_ind.ts_write;
4712
st.idx_write = thread->st_statistics.st_ind.ts_write;
4713
now = xt_trace_clock();
4716
#ifdef DEBUG_CHECK_IND_CACHE
4717
xt_ind_check_cache(NULL);
4719
xt_lock_mutex_ns(&tab->tab_ind_flush_lock);
4720
TRACE_FLUSH("LOCKED flush index lock", thread, &st, &now, 0);
4722
if (!xt_begin_checkpoint(tab->tab_db, have_table_lock, ot->ot_thread))
4725
ASSERT_NS(!tab->tab_ind_flush_ilog);
4726
if (!tab->tab_db->db_indlogs.ilp_get_log(&tab->tab_ind_flush_ilog, ot->ot_thread))
4728
il = tab->tab_ind_flush_ilog;
4730
if (!il->il_reset(ot))
4732
if (!il->il_write_byte(ot, XT_DT_LOG_HEAD))
4734
if (!il->il_write_word4(ot, tab->tab_id))
4736
if (!il->il_write_word4(ot, 0))
4738
TRACE_FLUSH("reset ilog", thread, &st, &now, 0);
4742
indp = tab->tab_dic.dic_keys;
4743
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4745
XT_INDEX_WRITE_LOCK(ind, ot);
4746
if (ind->mi_free_list && ind->mi_free_list->fl_free_count)
4747
block_on_free_list = TRUE;
4748
dirty_blocks += ind->mi_dirty_blocks;
4750
TRACE_FLUSH("LOCKED all indexes", thread, &st, &now, 0);
4752
if (!dirty_blocks && !block_on_free_list) {
4753
/* Nothing to flush... */
4754
indp = tab->tab_dic.dic_keys;
4755
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4757
XT_INDEX_UNLOCK(ind, ot);
4762
#ifdef TRACE_FLUSH_INDEX
4764
printf("FLUSH INDEX pages=%lu %s\n", (u_long) dirty_blocks, tab->tab_name->ps_path);
4768
fit->fit_dirty_blocks = dirty_blocks;
4770
// 128 dirty blocks == 2MB
4772
*bytes_flushed += (dirty_blocks * XT_INDEX_PAGE_SIZE);
4774
/* Collect the index roots: */
4775
data = tab->tab_index_head->tp_data;
4777
/* Collect a complete list of all dirty blocks: */
4778
indp = tab->tab_dic.dic_keys;
4779
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4781
xt_spinlock_lock(&ind->mi_dirty_lock);
4782
if ((block = ind->mi_dirty_list)) {
4784
ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_DIRTY);
4785
#ifdef IND_OPT_DATA_WRITTEN
4786
ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
4788
tab->tab_ind_dirty_list.dl_add_block(block);
4789
fblock = block->cb_dirty_next;
4790
block->cb_dirty_next = NULL;
4791
block->cb_dirty_prev = NULL;
4792
block->cb_state = IDX_CAC_BLOCK_FLUSHING;
4796
//dirty_count = ind->mi_dirty_blocks;
4797
ind->mi_dirty_blocks = 0;
4798
ind->mi_dirty_list = NULL;
4799
xt_spinlock_unlock(&ind->mi_dirty_lock);
4800
//ot->ot_thread->st_statistics.st_ind_cache_dirty -= dirty_count;
4801
XT_SET_NODE_REF(tab, data, ind->mi_root);
4802
data += XT_NODE_REF_SIZE;
4805
TRACE_FLUSH("Collected all blocks", thread, &st, &now, 0);
4807
xt_lock_mutex_ns(&tab->tab_ind_lock);
4808
TRACE_FLUSH("LOCKED table index lock", thread, &st, &now, 0);
4810
/* Write the free list: */
4811
if (block_on_free_list) {
4813
xtWord1 buffer[XT_BLOCK_SIZE_FOR_DIRECT_IO];
4814
XTIndFreeBlockRec free_block;
4816
memset(x.buffer, 0, sizeof(XTIndFreeBlockRec));
4818
/* The old start of the free list: */
4819
XT_NODE_ID(ind_free) = 0;
4820
/* This is a list of lists: */
4821
while ((list_ptr = tab->tab_ind_free_list)) {
4822
/* If this free list still has unused blocks,
4823
* pick the first. That is the front of
4824
* the list of free blocks.
4826
if (list_ptr->fl_start < list_ptr->fl_free_count) {
4827
ind_free = list_ptr->fl_page_id[list_ptr->fl_start];
4830
/* This list is empty, free it: */
4831
tab->tab_ind_free_list = list_ptr->fl_next_list;
4832
xt_free_ns(list_ptr);
4834
/* If nothing is on any list, then
4835
* take the value stored in the index header.
4836
* It is the from of the list on disk.
4838
if (!XT_NODE_ID(ind_free))
4839
ind_free = tab->tab_ind_free;
4841
if (!il->il_write_byte(ot, XT_DT_FREE_LIST))
4843
indp = tab->tab_dic.dic_keys;
4844
XT_NODE_ID(last_address) = 0;
4845
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4847
if (ind->mi_free_list && ind->mi_free_list->fl_free_count) {
4848
for (j=0; j<ind->mi_free_list->fl_free_count; j++) {
4849
next_address = ind->mi_free_list->fl_page_id[j];
4850
/* Write out the IDs of the free blocks. */
4851
if (!il->il_write_word4(ot, XT_NODE_ID(ind->mi_free_list->fl_page_id[j])))
4853
if (XT_NODE_ID(last_address)) {
4854
/* Update the page in cache, if it is in the cache! */
4855
XT_SET_DISK_8(x.free_block.if_next_block_8, XT_NODE_ID(next_address));
4856
if (!xt_ind_write_cache(ot, last_address, 8, x.buffer))
4859
last_address = next_address;
4863
if (!il->il_write_word4(ot, XT_NODE_ID(ind_free)))
4865
if (XT_NODE_ID(last_address)) {
4866
XT_SET_DISK_8(x.free_block.if_next_block_8, XT_NODE_ID(tab->tab_ind_free));
4867
if (!xt_ind_write_cache(ot, last_address, 8, x.buffer))
4870
if (!il->il_write_word4(ot, 0xFFFFFFFF))
4875
* Add the free list caches to the global free list cache.
4876
* Added backwards to match the write order.
4878
indp = tab->tab_dic.dic_keys + tab->tab_dic.dic_key_count-1;
4879
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp--) {
4881
if (ind->mi_free_list) {
4882
ind->mi_free_list->fl_next_list = tab->tab_ind_free_list;
4883
tab->tab_ind_free_list = ind->mi_free_list;
4885
ind->mi_free_list = NULL;
4889
* The new start of the free list is the first
4890
* item on the table free list:
4892
XT_NODE_ID(ind_free) = 0;
4893
while ((list_ptr = tab->tab_ind_free_list)) {
4894
if (list_ptr->fl_start < list_ptr->fl_free_count) {
4895
ind_free = list_ptr->fl_page_id[list_ptr->fl_start];
4898
tab->tab_ind_free_list = list_ptr->fl_next_list;
4899
xt_free_ns(list_ptr);
4901
if (!XT_NODE_ID(ind_free))
4902
ind_free = tab->tab_ind_free;
4903
TRACE_FLUSH("did free block stuff", thread, &st, &now, 0);
4904
xt_unlock_mutex_ns(&tab->tab_ind_lock);
4906
XT_SET_DISK_6(tab->tab_index_head->tp_ind_eof_6, XT_NODE_ID(tab->tab_ind_eof));
4907
XT_SET_DISK_6(tab->tab_index_head->tp_ind_free_6, XT_NODE_ID(ind_free));
4909
TRACE_FLUSH("UN-LOCKED table index lock", thread, &st, &now, 0);
4910
if (!il->il_write_header(ot, XT_INDEX_HEAD_SIZE, (xtWord1 *) tab->tab_index_head))
4913
TRACE_FLUSH("wrote ilog header", thread, &st, &now, 0);
4914
indp = tab->tab_dic.dic_keys;
4915
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4917
XT_INDEX_UNLOCK(ind, ot);
4920
TRACE_FLUSH("UN-LOCKED all indexes", thread, &st, &now, 0);
4922
/* Write all blocks to the index log: */
4923
if (tab->tab_ind_dirty_list.dl_total_blocks) {
4924
tab->tab_ind_dirty_list.dl_sort_blocks();
4926
while ((block = tab->tab_ind_dirty_list.dl_next_block(&it))) {
4927
XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
4928
if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
4929
if (!il->il_write_block(ot, block)) {
4930
XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
4934
XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
4938
/* {PAGE-NO-IN-INDEX-FILE}
4939
* At this point all blocks have been written to the index file.
4940
* It is not safe to release them from the cache.
4941
* It was not safe to do this before this point because
4942
* read would read the wrong data.
4944
* The exception to this is freed blocks.
4945
* These are cached separately in the free block list.
4947
TRACE_FLUSH("Wrote all blocks to the ilog", thread, &st, &now, 0);
4949
if (il->il_data_written()) {
4950
/* Flush the log before we flush the index.
4952
* The reason is, we must make sure that changes that
4953
* will be in the index are already in the transaction
4956
* Only then are we able to undo those changes on
4960
* CREATE TABLE t1 (s1 INT PRIMARY KEY);
4961
* INSERT INTO t1 VALUES (1);
4964
* INSERT INTO t1 VALUES (2);
4966
* --- INDEX IS FLUSHED HERE ---
4968
* --- SERVER CRASH HERE ---
4971
* The INSERT VALUES (2) has been written
4972
* to the log, but not flushed.
4973
* But the index has been updated.
4974
* If the index is flushed it will contain
4975
* the entry for record with s1=2.
4977
* This entry must be removed on recovery.
4979
* To prevent this situation I flush the log
4982
if (!XT_IS_TEMP_TABLE(tab->tab_dic.dic_tab_flags)) {
4983
/* Note, thread->st_database may not be set here:
4984
* #0 0x00defba3 in xt_spinlock_set at lock_xt.h:249
4985
* #1 0x00defbfd in xt_spinlock_lock at lock_xt.h:299
4986
* #2 0x00e65a98 in XTDatabaseLog::xlog_flush_pending at xactlog_xt.cc:737
4987
* #3 0x00e6a27f in XTDatabaseLog::xlog_flush at xactlog_xt.cc:727
4988
* #4 0x00e6a308 in xt_xlog_flush_log at xactlog_xt.cc:1476
4989
* #5 0x00e22fee in xt_flush_indices at index_xt.cc:4599
4990
* #6 0x00e49fe0 in xt_close_table at table_xt.cc:2678
4991
* #7 0x00df0f10 in xt_db_pool_exit at database_xt.cc:758
4992
* #8 0x00df3ca2 in db_finalize at database_xt.cc:342
4993
* #9 0x00e17037 in xt_heap_release at heap_xt.cc:110
4994
* #10 0x00df07c0 in db_hash_free at database_xt.cc:245
4995
* #11 0x00e0b145 in xt_free_hashtable at hashtab_xt.cc:84
4996
* #12 0x00df093e in xt_exit_databases at database_xt.cc:303
4997
* #13 0x00e0d3cf in pbxt_call_exit at ha_pbxt.cc:946
4998
* #14 0x00e0d498 in ha_exit at ha_pbxt.cc:975
4999
* #15 0x00e0dce6 in pbxt_end at ha_pbxt.cc:1274
5000
* #16 0x00e0dd03 in pbxt_panic at ha_pbxt.cc:1287
5001
* #17 0x002321a1 in ha_finalize_handlerton at handler.cc:392
5002
* #18 0x002f0567 in plugin_deinitialize at sql_plugin.cc:815
5003
* #19 0x002f370e in reap_plugins at sql_plugin.cc:903
5004
* #20 0x002f3eac in plugin_shutdown at sql_plugin.cc:1512
5005
* #21 0x000f3ba6 in clean_up at mysqld.cc:1238
5006
* #22 0x000f4046 in unireg_end at mysqld.cc:1166
5007
* #23 0x000fc2c5 in kill_server at mysqld.cc:1108
5008
* #24 0x000fd0d5 in kill_server_thread at mysqld.cc:1129
5010
if (!tab->tab_db->db_xlog.xlog_flush(ot->ot_thread))
5012
TRACE_FLUSH("FLUSHED xlog", thread, &st, &now, 0);
5015
if (!il->il_flush(ot))
5017
TRACE_FLUSH("FLUSHED ilog", thread, &st, &now, ILOG_FLUSH);
5019
if (!il->il_apply_log_write(ot))
5022
TRACE_FLUSH("wrote all blocks to index", thread, &st, &now, 0);
5025
* 1.0.01 - I have split apply log into flush and write
5028
* I have to write before waiting threads can continue
5029
* because otherwise incorrect data will be read
5030
* when the cache block is freed before it is written
5033
* Risk: (see below).
5036
while ((block = tab->tab_ind_dirty_list.dl_next_block(&it))) {
5037
XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
5038
if (block->cb_state == IDX_CAC_BLOCK_LOGGED) {
5039
block->cb_state = IDX_CAC_BLOCK_CLEAN;
5040
ot->ot_thread->st_statistics.st_ind_cache_dirty--;
5042
XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
5045
/* This will early notification for threads waiting for this operation
5046
* to get to this point.
5049
fit->fit_blocks_flushed = fit->fit_dirty_blocks;
5050
fit->fit_dirty_blocks = 0;
5051
xt_async_task_notify(fit);
5055
* 1.0.01 - Note: I have moved the flush to here.
5056
* It allows the calling thread to continue during the
5059
* The problem is, if the ilog flush fails then we have
5060
* lost the information to re-create a consistent flush again!
5062
if (!il->il_apply_log_flush(ot))
5064
TRACE_FLUSH("FLUSHED index file", thread, &st, &now, INDEX_FLUSH);
5068
tab->tab_ind_dirty_list.dl_free_all();
5069
tab->tab_ind_flush_ilog = NULL;
5072
/* Mark this table as index flushed: */
5073
xt_checkpoint_set_flush_state(tab->tab_db, tab->tab_id, XT_CPT_STATE_DONE_INDEX);
5074
TRACE_FLUSH("set index state", thread, &st, &now, 0);
5076
#ifdef TRACE_FLUSH_INDEX
5078
printf("flush index (%d) %s DONE\n", (int) (time(NULL) - tnow), tab->tab_name->ps_path);
5083
xt_unlock_mutex_ns(&tab->tab_ind_flush_lock);
5084
TRACE_FLUSH("UN-LOCKED flush index lock", thread, &st, &now, 0);
5086
if (!xt_end_checkpoint(tab->tab_db, ot->ot_thread, NULL))
5089
#ifdef DEBUG_CHECK_IND_CACHE
5090
xt_ind_check_cache((XTIndex *) 1);
5095
indp = tab->tab_dic.dic_keys;
5096
for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
5098
XT_INDEX_UNLOCK(ind, ot);
5102
tab->tab_ind_dirty_list.dl_free_all();
5103
tab->tab_ind_flush_ilog = NULL;
5107
xt_checkpoint_set_flush_state(tab->tab_db, tab->tab_id, XT_CPT_STATE_STOP_INDEX);
5109
#ifdef TRACE_FLUSH_INDEX
5111
printf("flush index (%d) %s FAILED\n", (int) (time(NULL) - tnow), tab->tab_name->ps_path);
5116
xt_unlock_mutex_ns(&tab->tab_ind_flush_lock);
5117
#ifdef DEBUG_CHECK_IND_CACHE
5118
xt_ind_check_cache(NULL);
5123
void XTIndexLogPool::ilp_init(struct XTThread *self, struct XTDatabase *db, size_t log_buffer_size)
5125
char path[PATH_MAX];
5129
XTIndexLogPtr il = NULL;
5130
XTOpenTablePtr ot = NULL;
5133
ilp_log_buffer_size = log_buffer_size;
5134
xt_init_mutex_with_autoname(self, &ilp_lock);
5136
xt_strcpy(PATH_MAX, path, db->db_main_path);
5137
xt_add_system_dir(PATH_MAX, path);
5138
if (xt_fs_exists(path)) {
5139
pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
5140
while (xt_dir_next(self, od)) {
5141
file = xt_dir_name(self, od);
5142
if (xt_starts_with(file, "ilog")) {
5143
if ((log_id = (xtLogID) xt_file_name_to_id(file))) {
5144
if (!ilp_open_log(&il, log_id, FALSE, self))
5146
if (il->il_tab_id && il->il_log_eof) {
5147
if (!il->il_open_table(&ot))
5150
if (!il->il_apply_log_write(ot))
5152
if (!il->il_apply_log_flush(ot))
5154
ot->ot_thread = self;
5155
il->il_close_table(ot);
5162
freer_(); // xt_dir_close(od)
5167
/* TODO: Mark index as corrupted: */
5169
il->il_close_table(ot);
5171
il->il_close(FALSE);
5175
void XTIndexLogPool::ilp_close(struct XTThread *XT_UNUSED(self), xtBool lock)
5180
xt_lock_mutex_ns(&ilp_lock);
5181
while ((il = ilp_log_pool)) {
5182
ilp_log_pool = il->il_next_in_pool;
5187
xt_unlock_mutex_ns(&ilp_lock);
5190
void XTIndexLogPool::ilp_exit(struct XTThread *self)
5192
ilp_close(self, FALSE);
5193
ASSERT_NS(il_pool_count == 0);
5194
xt_free_mutex(&ilp_lock);
5197
void XTIndexLogPool::ilp_name(size_t size, char *path, xtLogID log_id)
5201
sprintf(name, "ilog-%lu.xt", (u_long) log_id);
5202
xt_strcpy(size, path, ilp_db->db_main_path);
5203
xt_add_system_dir(size, path);
5204
xt_add_dir_char(size, path);
5205
xt_strcat(size, path, name);
5208
xtBool XTIndexLogPool::ilp_open_log(XTIndexLogPtr *ret_il, xtLogID log_id, xtBool excl, XTThreadPtr thread)
5210
char log_path[PATH_MAX];
5212
XTIndLogHeadDRec log_head;
5215
ilp_name(PATH_MAX, log_path, log_id);
5216
if (!(il = (XTIndexLogPtr) xt_calloc_ns(sizeof(XTIndexLogRec))))
5218
xt_spinlock_init_with_autoname(NULL, &il->il_write_lock);
5219
il->il_log_id = log_id;
5222
/* Writes will be rounded up to the nearest direct write block size (see {WRITE-IN-BLOCKS}),
5223
* so make sure we have space in the buffer for that:
5226
if (IND_WRITE_BLOCK_SIZE < XT_BLOCK_SIZE_FOR_DIRECT_IO)
5228
#ifdef IND_WRITE_IN_BLOCK_SIZES
5229
if (XT_INDEX_PAGE_SIZE < IND_WRITE_BLOCK_SIZE)
5233
#ifdef IND_FILL_BLOCK_TO_NEXT
5234
if (!(il->il_buffer = (xtWord1 *) xt_malloc_ns(ilp_log_buffer_size + XT_INDEX_PAGE_SIZE)))
5237
if (!(il->il_buffer = (xtWord1 *) xt_malloc_ns(ilp_log_buffer_size + IND_WRITE_BLOCK_SIZE)))
5240
il->il_buffer_size = ilp_log_buffer_size;
5242
if (!(il->il_of = xt_open_file_ns(log_path, XT_FT_STANDARD, (excl ? XT_FS_EXCLUSIVE : 0) | XT_FS_CREATE | XT_FS_MAKE_PATH, 0)))
5245
if (!xt_pread_file(il->il_of, 0, sizeof(XTIndLogHeadDRec), 0, &log_head, &read_size, &thread->st_statistics.st_ilog, thread))
5248
if (read_size == sizeof(XTIndLogHeadDRec)) {
5249
il->il_tab_id = XT_GET_DISK_4(log_head.ilh_tab_id_4);
5250
il->il_log_eof = XT_GET_DISK_4(log_head.ilh_log_eof_4);
5261
il->il_close(FALSE);
5265
xtBool XTIndexLogPool::ilp_get_log(XTIndexLogPtr *ret_il, XTThreadPtr thread)
5270
xt_lock_mutex_ns(&ilp_lock);
5271
if ((il = ilp_log_pool)) {
5272
ilp_log_pool = il->il_next_in_pool;
5277
log_id = ilp_next_log_id;
5279
xt_unlock_mutex_ns(&ilp_lock);
5281
if (!ilp_open_log(&il, log_id, TRUE, thread))
5288
void XTIndexLogPool::ilp_release_log(XTIndexLogPtr il)
5290
xt_lock_mutex_ns(&ilp_lock);
5291
if (il_pool_count == 5)
5295
il->il_next_in_pool = ilp_log_pool;
5298
xt_unlock_mutex_ns(&ilp_lock);
5301
xtBool XTIndexLog::il_reset(struct XTOpenTable *ot)
5303
XTIndLogHeadDRec log_head;
5304
xtTableID tab_id = ot->ot_table->tab_id;
5309
il_buffer_offset = 0;
5311
/* We must write the header and flush here or the "previous" status (from the
5312
* last flush run) could remain. Failure to write the file completely leave the
5313
* old header in place, and other parts of the file changed.
5314
* This would lead to index corruption.
5316
log_head.ilh_data_type = XT_DT_LOG_HEAD;
5317
XT_SET_DISK_4(log_head.ilh_tab_id_4, tab_id);
5318
XT_SET_DISK_4(log_head.ilh_log_eof_4, 0);
5320
if (!xt_pwrite_file(il_of, 0, sizeof(XTIndLogHeadDRec), (xtWord1 *) &log_head, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5323
if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5329
xtBool XTIndexLog::il_data_written()
5331
return il_buffer_offset != 0 || il_buffer_len != 0;
5334
void XTIndexLog::il_close(xtBool delete_it)
5336
xtLogID log_id = il_log_id;
5339
xt_close_file_ns(il_of);
5343
if (delete_it && log_id) {
5344
char log_path[PATH_MAX];
5346
il_pool->ilp_name(PATH_MAX, log_path, log_id);
5347
xt_fs_delete(NULL, log_path);
5351
xt_free_ns(il_buffer);
5355
xt_spinlock_free(NULL, &il_write_lock);
5359
void XTIndexLog::il_release()
5361
il_pool->ilp_db->db_indlogs.ilp_release_log(this);
5364
xtBool XTIndexLog::il_require_space(size_t bytes, XTThreadPtr thread)
5366
if (il_buffer_len + bytes > il_buffer_size) {
5367
if (!xt_pwrite_file(il_of, il_buffer_offset, il_buffer_len, il_buffer, &thread->st_statistics.st_ilog, thread))
5369
il_buffer_offset += il_buffer_len;
5376
xtBool XTIndexLog::il_write_byte(struct XTOpenTable *ot, xtWord1 byte)
5378
if (!il_require_space(1, ot->ot_thread))
5380
*(il_buffer + il_buffer_len) = byte;
5385
xtBool XTIndexLog::il_write_word4(struct XTOpenTable *ot, xtWord4 value)
5389
if (!il_require_space(4, ot->ot_thread))
5391
buffer = il_buffer + il_buffer_len;
5392
XT_SET_DISK_4(buffer, value);
5398
* This function assumes that the block is xlocked!
5400
xtBool XTIndexLog::il_write_block(struct XTOpenTable *ot, XTIndBlockPtr block)
5402
xtIndexNodeID node_id;
5403
XTIdxBranchDPtr node;
5406
node_id = block->cb_address;
5407
node = (XTIdxBranchDPtr) block->cb_data;
5408
block_len = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(node->tb_size_2));
5410
//il_min_byte_count += (block->cb_max_pos - block->cb_min_pos) + (block->cb_header ? XT_INDEX_PAGE_HEAD_SIZE : 0);
5411
#ifdef IND_WRITE_MIN_DATA
5414
xtBool eo_block = FALSE;
5416
#ifdef IND_WRITE_IN_BLOCK_SIZES
5417
/* Round up to block boundary: */
5418
max_pos = ((block->cb_max_pos + XT_INDEX_PAGE_HEAD_SIZE + IND_WRITE_BLOCK_SIZE - 1) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5419
if (max_pos > block_len)
5420
max_pos = block_len;
5421
max_pos -= XT_INDEX_PAGE_HEAD_SIZE;
5422
/* Round down to block boundary: */
5423
min_pos = ((block->cb_min_pos + XT_INDEX_PAGE_HEAD_SIZE) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5425
min_pos -= XT_INDEX_PAGE_HEAD_SIZE;
5427
max_pos = block->cb_max_pos;
5428
min_pos = block->cb_min_pos;
5431
if (block_len == max_pos + XT_INDEX_PAGE_HEAD_SIZE)
5434
ASSERT_NS(max_pos <= XT_INDEX_PAGE_SIZE-XT_INDEX_PAGE_HEAD_SIZE);
5435
ASSERT_NS(min_pos <= block_len-XT_INDEX_PAGE_HEAD_SIZE);
5436
ASSERT_NS(max_pos <= block_len-XT_INDEX_PAGE_HEAD_SIZE);
5437
ASSERT_NS(min_pos <= max_pos);
5439
xt_spinlock_lock(&il_write_lock);
5440
if (block->cb_min_pos == block->cb_max_pos) {
5441
/* This means just the header was changed. */
5442
#ifdef IND_WRITE_IN_BLOCK_SIZES
5443
XTIndShortPageDataDPtr sh_data;
5445
if (!il_require_space(offsetof(XTIndShortPageDataDRec, ild_data) + IND_WRITE_BLOCK_SIZE, ot->ot_thread))
5448
sh_data = (XTIndShortPageDataDPtr) (il_buffer + il_buffer_len);
5449
sh_data->ild_data_type = XT_DT_SHORT_IND_PAGE;
5450
XT_SET_DISK_4(sh_data->ild_page_id_4, XT_NODE_ID(node_id));
5451
XT_SET_DISK_2(sh_data->ild_size_2, IND_WRITE_BLOCK_SIZE);
5452
memcpy(sh_data->ild_data, block->cb_data, IND_WRITE_BLOCK_SIZE);
5453
il_buffer_len += offsetof(XTIndShortPageDataDRec, ild_data) + IND_WRITE_BLOCK_SIZE;
5455
XTIndSetPageHeadDataDPtr sph_data;
5457
if (!il_require_space(sizeof(XTIndSetPageHeadDataDRec), ot->ot_thread))
5460
ASSERT_NS(sizeof(XTIndSetPageHeadDataDRec) <= il_buffer_size);
5462
sph_data = (XTIndSetPageHeadDataDPtr) (il_buffer + il_buffer_len);
5463
sph_data->ild_data_type = XT_DT_SET_PAGE_HEAD;
5464
XT_SET_DISK_4(sph_data->ild_page_id_4, XT_NODE_ID(node_id));
5465
XT_COPY_DISK_2(sph_data->ild_page_head_2, block->cb_data);
5466
il_buffer_len += sizeof(XTIndSetPageHeadDataDRec);
5469
#ifdef IND_WRITE_IN_BLOCK_SIZES
5470
else if (min_pos == 0 || (block->cb_header && min_pos == IND_WRITE_BLOCK_SIZE - XT_INDEX_PAGE_HEAD_SIZE))
5472
else if (min_pos < 16 - XT_INDEX_PAGE_HEAD_SIZE)
5475
/* Fuse, and write the whole block: */
5477
XTIndPageDataDPtr p_data;
5479
if (!il_require_space(offsetof(XTIndPageDataDRec, ild_data) + block_len, ot->ot_thread))
5482
ASSERT_NS(offsetof(XTIndPageDataDRec, ild_data) + block_len <= il_buffer_size);
5484
p_data = (XTIndPageDataDPtr) (il_buffer + il_buffer_len);
5485
p_data->ild_data_type = XT_DT_INDEX_PAGE;
5486
XT_SET_DISK_4(p_data->ild_page_id_4, XT_NODE_ID(node_id));
5487
memcpy(p_data->ild_data, block->cb_data, block_len);
5488
il_buffer_len += offsetof(XTIndPageDataDRec, ild_data) + block_len;
5491
XTIndShortPageDataDPtr sp_data;
5493
block_len = max_pos + XT_INDEX_PAGE_HEAD_SIZE;
5495
if (!il_require_space(offsetof(XTIndShortPageDataDRec, ild_data) + block_len, ot->ot_thread))
5498
ASSERT_NS(offsetof(XTIndShortPageDataDRec, ild_data) + block_len <= il_buffer_size);
5500
sp_data = (XTIndShortPageDataDPtr) (il_buffer + il_buffer_len);
5501
sp_data->ild_data_type = XT_DT_SHORT_IND_PAGE;
5502
XT_SET_DISK_4(sp_data->ild_page_id_4, XT_NODE_ID(node_id));
5503
XT_SET_DISK_2(sp_data->ild_size_2, block_len);
5504
memcpy(sp_data->ild_data, block->cb_data, block_len);
5505
il_buffer_len += offsetof(XTIndShortPageDataDRec, ild_data) + block_len;
5509
block_len = max_pos - min_pos;
5511
if (block->cb_header) {
5512
#ifdef IND_WRITE_IN_BLOCK_SIZES
5513
XTIndDoubleModPageDataDPtr dd_data;
5515
ASSERT_NS(min_pos + XT_INDEX_PAGE_HEAD_SIZE >= 2*IND_WRITE_BLOCK_SIZE);
5516
ASSERT_NS((min_pos + XT_INDEX_PAGE_HEAD_SIZE) % IND_WRITE_BLOCK_SIZE == 0);
5517
if (!il_require_space(offsetof(XTIndDoubleModPageDataDRec, dld_data) + IND_WRITE_BLOCK_SIZE + block_len, ot->ot_thread))
5520
dd_data = (XTIndDoubleModPageDataDPtr) (il_buffer + il_buffer_len);
5521
dd_data->dld_data_type = eo_block ? XT_DT_2_MOD_IND_PAGE_EOB : XT_DT_2_MOD_IND_PAGE;
5522
XT_SET_DISK_4(dd_data->dld_page_id_4, XT_NODE_ID(node_id));
5523
XT_SET_DISK_2(dd_data->dld_size1_2, IND_WRITE_BLOCK_SIZE);
5524
XT_SET_DISK_2(dd_data->dld_offset2_2, min_pos);
5525
XT_SET_DISK_2(dd_data->dld_size2_2, block_len);
5526
memcpy(dd_data->dld_data, block->cb_data, IND_WRITE_BLOCK_SIZE);
5527
memcpy(dd_data->dld_data + IND_WRITE_BLOCK_SIZE, block->cb_data + XT_INDEX_PAGE_HEAD_SIZE + min_pos, block_len);
5528
il_buffer_len += offsetof(XTIndDoubleModPageDataDRec, dld_data) + IND_WRITE_BLOCK_SIZE + block_len;
5530
XTIndModPageHeadDataDPtr mph_data;
5532
if (!il_require_space(offsetof(XTIndModPageHeadDataDRec, ild_data) + block_len, ot->ot_thread))
5535
mph_data = (XTIndModPageHeadDataDPtr) (il_buffer + il_buffer_len);
5536
mph_data->ild_data_type = eo_block ? XT_DT_MOD_IND_PAGE_HEAD_EOB : XT_DT_MOD_IND_PAGE_HEAD;
5537
XT_SET_DISK_4(mph_data->ild_page_id_4, XT_NODE_ID(node_id));
5538
XT_SET_DISK_2(mph_data->ild_size_2, block_len);
5539
XT_SET_DISK_2(mph_data->ild_offset_2, min_pos);
5540
XT_COPY_DISK_2(mph_data->ild_page_head_2, block->cb_data);
5541
memcpy(mph_data->ild_data, block->cb_data + XT_INDEX_PAGE_HEAD_SIZE + min_pos, block_len);
5542
il_buffer_len += offsetof(XTIndModPageHeadDataDRec, ild_data) + block_len;
5546
XTIndModPageDataDPtr mp_data;
5548
if (!il_require_space(offsetof(XTIndModPageDataDRec, ild_data) + block_len, ot->ot_thread))
5551
mp_data = (XTIndModPageDataDPtr) (il_buffer + il_buffer_len);
5552
mp_data->ild_data_type = eo_block ? XT_DT_MOD_IND_PAGE_EOB : XT_DT_MOD_IND_PAGE;
5553
XT_SET_DISK_4(mp_data->ild_page_id_4, XT_NODE_ID(node_id));
5554
XT_SET_DISK_2(mp_data->ild_size_2, block_len);
5555
XT_SET_DISK_2(mp_data->ild_offset_2, min_pos);
5556
memcpy(mp_data->ild_data, block->cb_data + XT_INDEX_PAGE_HEAD_SIZE + min_pos, block_len);
5557
il_buffer_len += offsetof(XTIndModPageDataDRec, ild_data) + block_len;
5561
block->cb_header = FALSE;
5562
block->cb_min_pos = 0xFFFF;
5563
block->cb_max_pos = 0;
5565
#else // IND_OPT_DATA_WRITTEN
5566
XTIndPageDataDPtr page_data;
5568
if (!il_require_space(offsetof(XTIndPageDataDRec, ild_data) + block_len, ot->ot_thread))
5571
ASSERT_NS(offsetof(XTIndPageDataDRec, ild_data) + XT_INDEX_PAGE_SIZE <= il_buffer_size);
5573
page_data = (XTIndPageDataDPtr) (il_buffer + il_buffer_len);
5574
TRACK_BLOCK_TO_FLUSH(node_id);
5575
page_data->ild_data_type = XT_DT_INDEX_PAGE;
5576
XT_SET_DISK_4(page_data->ild_page_id_4, XT_NODE_ID(node_id));
5577
memcpy(page_data->ild_data, block->cb_data, block_len);
5579
il_buffer_len += offsetof(XTIndPageDataDRec, ild_data) + block_len;
5581
#endif // IND_OPT_DATA_WRITTEN
5582
xt_spinlock_unlock(&il_write_lock);
5584
ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_FLUSHING);
5585
block->cb_state = IDX_CAC_BLOCK_LOGGED;
5587
TRACK_BLOCK_TO_FLUSH(node_id);
5591
xt_spinlock_unlock(&il_write_lock);
5595
xtBool XTIndexLog::il_write_header(struct XTOpenTable *ot, size_t head_size, xtWord1 *head_buf)
5597
XTIndHeadDataDPtr head_data;
5599
if (!il_require_space(offsetof(XTIndHeadDataDRec, ilh_data) + head_size, ot->ot_thread))
5602
head_data = (XTIndHeadDataDPtr) (il_buffer + il_buffer_len);
5603
head_data->ilh_data_type = XT_DT_HEADER;
5604
XT_SET_DISK_2(head_data->ilh_head_size_2, head_size);
5605
memcpy(head_data->ilh_data, head_buf, head_size);
5607
il_buffer_len += offsetof(XTIndHeadDataDRec, ilh_data) + head_size;
5612
xtBool XTIndexLog::il_flush(struct XTOpenTable *ot)
5614
XTIndLogHeadDRec log_head;
5615
xtTableID tab_id = ot->ot_table->tab_id;
5617
if (il_buffer_len) {
5618
if (!xt_pwrite_file(il_of, il_buffer_offset, il_buffer_len, il_buffer, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5620
il_buffer_offset += il_buffer_len;
5624
if (il_log_eof != il_buffer_offset) {
5625
log_head.ilh_data_type = XT_DT_LOG_HEAD;
5626
XT_SET_DISK_4(log_head.ilh_tab_id_4, tab_id);
5627
XT_SET_DISK_4(log_head.ilh_log_eof_4, il_buffer_offset);
5629
if (!XT_IS_TEMP_TABLE(ot->ot_table->tab_dic.dic_tab_flags)) {
5630
if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5634
if (!xt_pwrite_file(il_of, 0, sizeof(XTIndLogHeadDRec), (xtWord1 *) &log_head, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5637
if (!XT_IS_TEMP_TABLE(ot->ot_table->tab_dic.dic_tab_flags)) {
5638
if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5643
il_log_eof = il_buffer_offset;
5648
#ifdef CHECK_IF_WRITE_WAS_OK
5649
static void check_buff(void *in_a, void *in_b, int len)
5651
xtWord1 *a = (xtWord1 *) in_a;
5652
xtWord1 *b = (xtWord1 *) in_b;
5655
while (offset < len) {
5657
printf("Missmatch at offset = %d %x != %x\n", offset, (int) *a, (int) *b);
5668
xtBool XTIndexLog::il_apply_log_write(struct XTOpenTable *ot)
5671
register XTTableHPtr tab = ot->ot_table;
5676
xtIndexNodeID node_id;
5677
size_t req_size = 0;
5678
XTIdxBranchDPtr node;
5681
xtWord1 *block_header;
5682
xtWord1 *block_data;
5683
#ifdef CHECK_IF_WRITE_WAS_OK
5684
XTIndReferenceRec c_iref;
5685
XTIdxBranchDPtr c_node;
5690
while (offset < il_log_eof) {
5691
if (offset < il_buffer_offset ||
5692
offset >= il_buffer_offset + (off_t) il_buffer_len) {
5693
il_buffer_len = il_buffer_size;
5694
if (il_log_eof - offset < (off_t) il_buffer_len)
5695
il_buffer_len = (size_t) (il_log_eof - offset);
5698
if (il_buffer_len < req_size) {
5699
xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_INDEX_LOG_CORRUPT, xt_file_path(il_of));
5700
xt_log_and_clear_exception_ns();
5703
if (!xt_pread_file(il_of, offset, il_buffer_len, il_buffer_len, il_buffer, NULL, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5705
il_buffer_offset = offset;
5707
pos = (size_t) (offset - il_buffer_offset);
5708
ASSERT_NS(pos < il_buffer_len);
5709
#ifdef CHECK_IF_WRITE_WAS_OK
5712
buffer = il_buffer + pos;
5714
case XT_DT_LOG_HEAD:
5715
req_size = sizeof(XTIndLogHeadDRec);
5716
if (il_buffer_len - pos < req_size) {
5723
case XT_DT_SHORT_IND_PAGE: {
5724
XTIndShortPageDataDPtr sp_data;
5726
req_size = offsetof(XTIndShortPageDataDRec, ild_data);
5727
if (il_buffer_len - pos < req_size) {
5732
sp_data = (XTIndShortPageDataDPtr) buffer;
5733
node_id = XT_RET_NODE_ID(XT_GET_DISK_4(sp_data->ild_page_id_4));
5734
block_len = XT_GET_DISK_2(sp_data->ild_size_2);
5735
block_data = sp_data->ild_data;
5738
case XT_DT_INDEX_PAGE:
5739
XTIndPageDataDPtr p_data;
5741
req_size = offsetof(XTIndPageDataDRec, ild_data);
5742
if (il_buffer_len - pos < req_size + 2) {
5747
p_data = (XTIndPageDataDPtr) buffer;
5748
node_id = XT_RET_NODE_ID(XT_GET_DISK_4(p_data->ild_page_id_4));
5749
node = (XTIdxBranchDPtr) p_data->ild_data;
5750
block_len = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(node->tb_size_2));
5751
block_data = p_data->ild_data;
5754
if (block_len < 2 || block_len > XT_INDEX_PAGE_SIZE) {
5755
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, tab->tab_name);
5759
req_size += block_len;
5760
#ifdef IND_FILL_BLOCK_TO_NEXT
5761
/* Make sure we have the start of the next block in the buffer:
5762
* Should always work because a XT_DT_INDEX_PAGE is never the last
5765
if (il_buffer_len - pos < req_size + offsetof(XTIndPageDataDRec, ild_data))
5767
if (il_buffer_len - pos < req_size)
5774
TRACK_BLOCK_FLUSH_N(node_id);
5775
address = xt_ind_node_to_offset(tab, node_id);
5776
#ifdef IND_WRITE_IN_BLOCK_SIZES
5777
/* {WRITE-IN-BLOCKS} Round up the block size. Space has been provided. */
5778
block_len = ((block_len + IND_WRITE_BLOCK_SIZE - 1) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5780
IDX_TRACE("%d- W%x\n", (int) XT_NODE_ID(node_id), (int) XT_GET_DISK_2(block_data));
5781
#ifdef IND_FILL_BLOCK_TO_NEXT
5782
if (block_len < XT_INDEX_PAGE_SIZE) {
5783
XTIndPageDataDPtr next_page_data;
5785
next_page_data = (XTIndPageDataDPtr) (buffer + req_size);
5786
if (next_page_data->ild_data_type == XT_DT_INDEX_PAGE) {
5787
xtIndexNodeID next_node_id;
5789
next_node_id = XT_RET_NODE_ID(XT_GET_DISK_4(next_page_data->ild_page_id_4));
5790
/* Write the whole page, if that means leaving no gaps! */
5791
if (next_node_id == node_id+1)
5792
block_len = XT_INDEX_PAGE_SIZE;
5796
ASSERT_NS(block_len >= 2 && block_len <= XT_INDEX_PAGE_SIZE);
5797
if (!il_pwrite_file(ot, address, block_len, block_data))
5803
case XT_DT_SET_PAGE_HEAD: {
5804
XTIndSetPageHeadDataDPtr sph_data;
5806
req_size = sizeof(XTIndSetPageHeadDataDRec);
5807
if (il_buffer_len - pos < req_size) {
5812
sph_data = (XTIndSetPageHeadDataDPtr) buffer;
5813
node_id = XT_RET_NODE_ID(XT_GET_DISK_4(sph_data->ild_page_id_4));
5816
block_header = sph_data->ild_page_head_2;
5820
case XT_DT_MOD_IND_PAGE_HEAD:
5821
case XT_DT_MOD_IND_PAGE_HEAD_EOB: {
5822
XTIndModPageHeadDataDPtr mph_data;
5824
req_size = offsetof(XTIndModPageHeadDataDRec, ild_data);
5825
if (il_buffer_len - pos < req_size) {
5830
mph_data = (XTIndModPageHeadDataDPtr) buffer;
5831
node_id = XT_RET_NODE_ID(XT_GET_DISK_4(mph_data->ild_page_id_4));
5832
block_offset = XT_GET_DISK_2(mph_data->ild_offset_2);
5833
block_len = XT_GET_DISK_2(mph_data->ild_size_2);
5834
block_header = mph_data->ild_page_head_2;
5835
block_data = mph_data->ild_data;
5838
case XT_DT_MOD_IND_PAGE:
5839
case XT_DT_MOD_IND_PAGE_EOB:
5840
XTIndModPageDataDPtr mp_data;
5842
req_size = offsetof(XTIndModPageDataDRec, ild_data);
5843
if (il_buffer_len - pos < req_size) {
5848
mp_data = (XTIndModPageDataDPtr) buffer;
5849
node_id = XT_RET_NODE_ID(XT_GET_DISK_4(mp_data->ild_page_id_4));
5850
block_offset = XT_GET_DISK_2(mp_data->ild_offset_2);
5851
block_len = XT_GET_DISK_2(mp_data->ild_size_2);
5852
block_header = NULL;
5853
block_data = mp_data->ild_data;
5856
if (block_offset + block_len > XT_INDEX_PAGE_DATA_SIZE) {
5857
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, tab->tab_name);
5861
req_size += block_len;
5862
if (il_buffer_len - pos < req_size) {
5867
TRACK_BLOCK_FLUSH_N(node_id);
5868
address = xt_ind_node_to_offset(tab, node_id);
5869
/* {WRITE-IN-BLOCKS} Round up the block size. Space has been provided. */
5870
IDX_TRACE("%d- W%x\n", (int) XT_NODE_ID(node_id), (int) XT_GET_DISK_2(block_data));
5872
if (!il_pwrite_file(ot, address, 2, block_header))
5877
#ifdef IND_WRITE_IN_BLOCK_SIZES
5878
if (*buffer == XT_DT_MOD_IND_PAGE_HEAD_EOB || *buffer == XT_DT_MOD_IND_PAGE_EOB)
5879
block_len = ((block_len + IND_WRITE_BLOCK_SIZE - 1) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5881
if (!il_pwrite_file(ot, address + XT_INDEX_PAGE_HEAD_SIZE + block_offset, block_len, block_data))
5888
case XT_DT_FREE_LIST:
5889
xtWord4 block, nblock;
5891
xtWord1 buffer[IND_WRITE_BLOCK_SIZE];
5892
XTIndFreeBlockRec free_block;
5896
memset(x.buffer, 0, sizeof(XTIndFreeBlockRec));
5903
if (il_buffer_len - pos < req_size) {
5904
il_buffer_len = il_buffer_size;
5905
if (il_log_eof - offset < (off_t) il_buffer_len)
5906
il_buffer_len = (size_t) (il_log_eof - offset);
5908
if (il_buffer_len < req_size) {
5909
xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_INDEX_LOG_CORRUPT, xt_file_path(il_of));
5910
xt_log_and_clear_exception_ns();
5913
if (!xt_pread_file(il_of, offset, il_buffer_len, il_buffer_len, il_buffer, NULL, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5917
block = XT_GET_DISK_4(il_buffer + pos);
5918
nblock = XT_GET_DISK_4(il_buffer + pos + 4);
5919
if (nblock == 0xFFFFFFFF)
5921
aoff = xt_ind_node_to_offset(tab, XT_RET_NODE_ID(block));
5922
XT_SET_DISK_8(x.free_block.if_next_block_8, nblock);
5923
IDX_TRACE("%d- *%x\n", (int) block, (int) XT_GET_DISK_2(x.buffer));
5924
if (!il_pwrite_file(ot, aoff, IND_WRITE_BLOCK_SIZE, x.buffer))
5934
XTIndHeadDataDPtr head_data;
5937
req_size = offsetof(XTIndHeadDataDRec, ilh_data);
5938
if (il_buffer_len - pos < req_size) {
5942
head_data = (XTIndHeadDataDPtr) buffer;
5943
len = XT_GET_DISK_2(head_data->ilh_head_size_2);
5945
req_size = offsetof(XTIndHeadDataDRec, ilh_data) + len;
5946
if (il_buffer_len - pos < req_size) {
5951
if (!il_pwrite_file(ot, 0, len, head_data->ilh_data))
5957
case XT_DT_2_MOD_IND_PAGE:
5958
case XT_DT_2_MOD_IND_PAGE_EOB:
5959
XTIndDoubleModPageDataDPtr dd_data;
5962
req_size = offsetof(XTIndDoubleModPageDataDRec, dld_data);
5963
if (il_buffer_len - pos < req_size) {
5968
dd_data = (XTIndDoubleModPageDataDPtr) buffer;
5969
node_id = XT_RET_NODE_ID(XT_GET_DISK_4(dd_data->dld_page_id_4));
5970
block_len = XT_GET_DISK_2(dd_data->dld_size1_2);
5971
block_offset = XT_GET_DISK_2(dd_data->dld_offset2_2);
5972
block_len2 = XT_GET_DISK_2(dd_data->dld_size2_2);
5973
block_data = dd_data->dld_data;
5975
req_size += block_len + block_len2;
5976
if (il_buffer_len - pos < req_size)
5982
TRACK_BLOCK_FLUSH_N(node_id);
5983
address = xt_ind_node_to_offset(tab, node_id);
5984
IDX_TRACE("%d- W%x\n", (int) XT_NODE_ID(node_id), (int) XT_GET_DISK_2(block_data));
5985
if (!il_pwrite_file(ot, address, block_len, block_data))
5988
#ifdef IND_WRITE_IN_BLOCK_SIZES
5989
if (*buffer == XT_DT_2_MOD_IND_PAGE_EOB)
5990
block_len2 = ((block_len2 + IND_WRITE_BLOCK_SIZE - 1) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5992
if (!il_pwrite_file(ot, address + XT_INDEX_PAGE_HEAD_SIZE + block_offset, block_len2, block_data + block_len))
5999
xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_INDEX_LOG_CORRUPT, xt_file_path(il_of));
6000
xt_log_and_clear_exception_ns();
6003
#ifdef CHECK_IF_WRITE_WAS_OK
6005
if (!xt_ind_get(ot, node_id, &c_iref))
6007
if (c_iref.ir_block) {
6008
c_node = (XTIdxBranchDPtr) c_iref.ir_block->cb_data;
6009
c_block_len = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(c_node->tb_size_2));
6011
if (!xt_pread_file(ot->ot_ind_file, address, XT_INDEX_PAGE_SIZE, 0, &ot->ot_ind_tmp_buf, NULL, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
6013
if (c_iref.ir_block->cb_min_pos == 0xFFFF)
6014
check_buff(&ot->ot_ind_tmp_buf, c_node, c_block_len);
6016
if (!c_iref.ir_block->cb_header)
6017
check_buff(&ot->ot_ind_tmp_buf, c_node, 2);
6018
check_buff(ot->ot_ind_tmp_buf.tb_data, c_node->tb_data, c_iref.ir_block->cb_min_pos);
6019
check_buff(ot->ot_ind_tmp_buf.tb_data + c_iref.ir_block->cb_max_pos,
6020
c_node->tb_data + c_iref.ir_block->cb_max_pos,
6021
c_block_len - XT_INDEX_PAGE_HEAD_SIZE - c_iref.ir_block->cb_max_pos);
6023
xt_ind_release(ot, NULL, XT_UNLOCK_WRITE, &c_iref);
6027
if (il_bytes_written >= IND_FLUSH_THRESHOLD) {
6028
if (!il_flush_file(ot))
6035
xtBool XTIndexLog::il_apply_log_flush(struct XTOpenTable *ot)
6037
register XTTableHPtr tab = ot->ot_table;
6038
XTIndLogHeadDRec log_head;
6040
#ifdef PRINT_IND_FLUSH_STATS
6041
xtWord8 b_flush_time = ot->ot_thread->st_statistics.st_ind.ts_flush_time;
6043
if (!il_flush_file(ot))
6045
#ifdef PRINT_IND_FLUSH_STATS
6053
ot->ot_table->tab_ind_flush_time += ot->ot_thread->st_statistics.st_ind.ts_flush_time - b_flush_time;
6054
ot->ot_table->tab_ind_flush++;
6056
time = (double) ot->ot_table->tab_ind_flush_time / (double) 1000000 / (double) ot->ot_table->tab_ind_flush;
6057
kb = (double) ot->ot_table->tab_ind_write / (double) ot->ot_table->tab_ind_flush / (double) 1024;
6058
printf("TIME: %s Kbytes: %s Mps: %s Flush Count: %d\n",
6059
idx_format(buf1, time),
6060
idx_format(buf2, kb),
6061
idx_format_mb(buf3, kb / time),
6062
(int) ot->ot_table->tab_ind_flush
6066
log_head.ilh_data_type = XT_DT_LOG_HEAD;
6067
XT_SET_DISK_4(log_head.ilh_tab_id_4, il_tab_id);
6068
XT_SET_DISK_4(log_head.ilh_log_eof_4, 0);
6070
if (!xt_pwrite_file(il_of, 0, sizeof(XTIndLogHeadDRec), (xtWord1 *) &log_head, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
6073
if (!XT_IS_TEMP_TABLE(tab->tab_dic.dic_tab_flags)) {
6074
if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
6080
inline xtBool XTIndexLog::il_pwrite_file(struct XTOpenTable *ot, off_t offs, size_t siz, void *dat)
6082
#ifdef IND_WRITE_IN_BLOCK_SIZES
6083
ASSERT_NS(((offs) % IND_WRITE_BLOCK_SIZE) == 0);
6084
ASSERT_NS(((siz) % IND_WRITE_BLOCK_SIZE) == 0);
6086
il_bytes_written += siz;
6087
#ifdef PRINT_IND_FLUSH_STATS
6090
u_int b_write = ot->ot_thread->st_statistics.st_ind.ts_write;
6091
ok = xt_pwrite_file(ot->ot_ind_file, offs, siz, dat, &ot->ot_thread->st_statistics.st_ind, ot->ot_thread);
6092
ot->ot_table->tab_ind_write += ot->ot_thread->st_statistics.st_ind.ts_write - b_write;
6095
return xt_pwrite_file(ot->ot_ind_file, offs, siz, dat, &ot->ot_thread->st_statistics.st_ind, ot->ot_thread);
6099
inline xtBool XTIndexLog::il_flush_file(struct XTOpenTable *ot)
6103
il_bytes_written = 0;
6104
if (!XT_IS_TEMP_TABLE(ot->ot_table->tab_dic.dic_tab_flags)) {
6105
ok = xt_flush_file(ot->ot_ind_file, &ot->ot_thread->st_statistics.st_ind, ot->ot_thread);
6110
xtBool XTIndexLog::il_open_table(struct XTOpenTable **ot)
6112
return xt_db_open_pool_table_ns(ot, il_pool->ilp_db, il_tab_id);
6115
void XTIndexLog::il_close_table(struct XTOpenTable *ot)
6117
xt_db_return_table_to_pool_ns(ot);