~drizzle-trunk/drizzle/development

1455.3.1 by Vladimir Kolesnikov
lp:drizzle + pbxt 1.1 + test results
1
/* Copyright (c) 2005 PrimeBase Technologies GmbH
2
 *
3
 * PrimeBase XT
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License
16
 * along with this program; if not, write to the Free Software
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
 *
19
 * 2005-09-30	Paul McCullagh
20
 *
21
 * H&G2JCtL
22
 */
23
24
#include "xt_config.h"
25
26
#ifdef DRIZZLED
27
#include <bitset>
28
#endif
29
30
#include <string.h>
31
#include <stdio.h>
32
#include <stddef.h>
33
#ifndef XT_WIN
34
#include <strings.h>
35
#endif
36
#include <zlib.h>
1455.3.11 by Paul McCullagh
bzlib.h header not available on suse-11-amd64 or ubuntu-9.10-amd64
37
/* This header not available on suse-11-amd64, ubuntu-9.10-amd64 */
38
//#include <bzlib.h>
1455.3.1 by Vladimir Kolesnikov
lp:drizzle + pbxt 1.1 + test results
39
40
#ifdef DRIZZLED
41
#include <drizzled/base.h>
42
using namespace drizzled;
43
#else
44
#include "mysql_priv.h"
45
#endif
46
47
#include "pthread_xt.h"
48
#include "memory_xt.h"
49
#include "index_xt.h"
50
#include "heap_xt.h"
51
#include "database_xt.h"
52
#include "strutil_xt.h"
53
#include "cache_xt.h"
54
#include "myxt_xt.h"
55
#include "trace_xt.h"
56
#include "table_xt.h"
57
58
#ifdef DEBUG
59
#define MAX_SEARCH_DEPTH			32
60
//#define CHECK_AND_PRINT
61
//#define CHECK_NODE_REFERENCE
62
//#define TRACE_FLUSH_INDEX
63
//#define CHECK_PRINTS_RECORD_REFERENCES
64
//#define DO_COMP_TEST
65
#define DUMP_INDEX
66
#else
67
#define MAX_SEARCH_DEPTH			100
68
#endif
69
70
//#define TRACE_FLUSH_TIMES
71
72
typedef struct IdxStackItem {
73
	XTIdxItemRec			i_pos;
74
	xtIndexNodeID			i_branch;
75
} IdxStackItemRec, *IdxStackItemPtr;
76
77
typedef struct IdxBranchStack {
78
	int						s_top;
79
	IdxStackItemRec			s_elements[MAX_SEARCH_DEPTH];
80
} IdxBranchStackRec, *IdxBranchStackPtr;
81
82
#ifdef DEBUG
83
#ifdef TEST_CODE
84
static void idx_check_on_key(XTOpenTablePtr ot);
85
#endif
86
static u_int idx_check_index(XTOpenTablePtr ot, XTIndexPtr ind, xtBool with_lock);
87
#endif
88
89
static xtBool idx_insert_node(XTOpenTablePtr ot, XTIndexPtr ind, IdxBranchStackPtr stack, xtBool last_item, XTIdxKeyValuePtr key_value, xtIndexNodeID branch);
90
static xtBool idx_remove_lazy_deleted_item_in_node(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID current, XTIndReferencePtr iref, XTIdxKeyValuePtr key_value);
91
92
#ifdef XT_TRACK_INDEX_UPDATES
93
94
static xtBool ind_track_write(struct XTOpenTable *ot, struct XTIndex *ind, xtIndexNodeID offset, size_t size, xtWord1 *data)
95
{
96
	ot->ot_ind_reads++;
97
	return xt_ind_write(ot, ind, offset, size, data);
98
}
99
100
#define XT_IND_WRITE					ind_track_write
101
102
#else
103
104
#define XT_IND_WRITE					xt_ind_write
105
106
#endif
107
108
109
#ifdef CHECK_NODE_REFERENCE
110
#define IDX_GET_NODE_REF(t, x, o)		idx_get_node_ref(t, x, o)
111
#else
112
#define IDX_GET_NODE_REF(t, x, o)		XT_GET_NODE_REF(t, (x) - (o))
113
#endif
114
115
/*
116
 * -----------------------------------------------------------------------
117
 * DEBUG ACTIVITY
118
 */
119
120
//#define TRACK_ACTIVITY
121
122
#ifdef TRACK_ACTIVITY
123
#define TRACK_MAX_BLOCKS			2000
124
125
typedef struct TrackBlock {
126
	xtWord1				exists;
127
	char				*activity;
128
} TrackBlockRec, *TrackBlockPtr;
129
130
TrackBlockRec		blocks[TRACK_MAX_BLOCKS];
131
132
xtPublic void track_work(u_int block, char *what)
133
{
134
	int len = 0, len2;
135
136
	ASSERT_NS(block > 0 && block <= TRACK_MAX_BLOCKS);
137
	block--;
138
	if (blocks[block].activity)
139
		len = strlen(blocks[block].activity);
140
	len2 = strlen(what);
141
	xt_realloc_ns((void **) &blocks[block].activity, len + len2 + 1);
142
	memcpy(blocks[block].activity + len, what, len2 + 1);
143
}
144
145
static void track_block_exists(xtIndexNodeID block)
146
{
147
	if (XT_NODE_ID(block) > 0 && XT_NODE_ID(block) <= TRACK_MAX_BLOCKS)
148
		blocks[XT_NODE_ID(block)-1].exists = TRUE;
149
}
150
151
static void track_reset_missing()
152
{
153
	for (u_int i=0; i<TRACK_MAX_BLOCKS; i++)
154
		blocks[i].exists = FALSE;
155
}
156
157
static void track_dump_missing(xtIndexNodeID eof_block)
158
{
159
	for (u_int i=0; i<XT_NODE_ID(eof_block)-1; i++) {
160
		if (!blocks[i].exists)
161
			printf("block missing = %04d %s\n", i+1, blocks[i].activity);
162
	}
163
}
164
165
static void track_dump_all(u_int max_block)
166
{
167
	for (u_int i=0; i<max_block; i++) {
168
		if (blocks[i].exists)
169
			printf(" %04d %s\n", i+1, blocks[i].activity);
170
		else
171
			printf("-%04d %s\n", i+1, blocks[i].activity);
172
	}
173
}
174
175
#endif
176
177
xtPublic void xt_ind_track_dump_block(XTTableHPtr XT_UNUSED(tab), xtIndexNodeID XT_UNUSED(address))
178
{
179
#ifdef TRACK_ACTIVITY
180
	u_int i = XT_NODE_ID(address)-1;
181
182
	printf("BLOCK %04d %s\n", i+1, blocks[i].activity);
183
#endif
184
}
185
186
#ifdef CHECK_NODE_REFERENCE
187
static xtIndexNodeID idx_get_node_ref(XTTableHPtr tab, xtWord1 *ref, u_int node_ref_size)
188
{
189
	xtIndexNodeID node;
190
191
	/* Node is invalid by default: */
192
	XT_NODE_ID(node) = 0xFFFFEEEE;
193
	if (node_ref_size) {
194
		ref -= node_ref_size;
195
		node = XT_RET_NODE_ID(XT_GET_DISK_4(ref));
196
		if (node >= tab->tab_ind_eof) {
197
			xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, tab->tab_name);
198
		}
199
	}
200
	return node;
201
}
202
#endif
203
204
/*
205
 * -----------------------------------------------------------------------
206
 * Stack functions
207
 */
208
209
static void idx_newstack(IdxBranchStackPtr stack)
210
{
211
	stack->s_top = 0;
212
}
213
214
static xtBool idx_push(IdxBranchStackPtr stack, xtIndexNodeID n, XTIdxItemPtr pos)
215
{
216
	if (stack->s_top == MAX_SEARCH_DEPTH) {
217
		xt_register_error(XT_REG_CONTEXT, XT_ERR_STACK_OVERFLOW, 0, "Index node stack overflow");
218
		return FAILED;
219
	}
220
	stack->s_elements[stack->s_top].i_branch = n;
221
	if (pos)
222
		stack->s_elements[stack->s_top].i_pos = *pos;
223
	stack->s_top++;
224
	return OK;
225
}
226
227
static IdxStackItemPtr idx_pop(IdxBranchStackPtr stack)
228
{
229
	if (stack->s_top == 0)
230
		return NULL;
231
	stack->s_top--;
232
	return &stack->s_elements[stack->s_top];
233
}
234
235
static IdxStackItemPtr idx_top(IdxBranchStackPtr stack)
236
{
237
	if (stack->s_top == 0)
238
		return NULL;
239
	return &stack->s_elements[stack->s_top-1];
240
}
241
242
/*
243
 * -----------------------------------------------------------------------
244
 * Allocation of nodes
245
 */
246
247
/*
248
 * Allocating and freeing blocks for an index is safe because this is a structural
249
 * change which requires an exclusive lock on the index!
250
 */
251
static xtBool idx_new_branch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID *address)
252
{
253
	register XTTableHPtr	tab;
254
	xtIndexNodeID			wrote_pos;
255
	XTIndFreeBlockRec		free_block;
256
	XTIndFreeListPtr		list_ptr;
257
258
	tab = ot->ot_table;
259
260
	//ASSERT_NS(XT_INDEX_HAVE_XLOCK(ind, ot));
261
	if (ind->mi_free_list && ind->mi_free_list->fl_free_count) {
262
		ind->mi_free_list->fl_free_count--;
263
		*address = ind->mi_free_list->fl_page_id[ind->mi_free_list->fl_free_count];
264
		TRACK_BLOCK_ALLOC(*address);
265
		return OK;
266
	}
267
268
	xt_lock_mutex_ns(&tab->tab_ind_lock);
269
270
	/* Check the cached free list: */
271
	while ((list_ptr = tab->tab_ind_free_list)) {
272
		if (list_ptr->fl_start < list_ptr->fl_free_count) {
273
			wrote_pos = list_ptr->fl_page_id[list_ptr->fl_start];
274
			list_ptr->fl_start++;
275
			xt_unlock_mutex_ns(&tab->tab_ind_lock);
276
			*address = wrote_pos;
277
			TRACK_BLOCK_ALLOC(wrote_pos);
278
			return OK;
279
		}
280
		tab->tab_ind_free_list = list_ptr->fl_next_list;
281
		xt_free_ns(list_ptr);
282
	}
283
284
	if ((XT_NODE_ID(wrote_pos) = XT_NODE_ID(tab->tab_ind_free))) {
285
		/* Use the block on the free list: */
286
		if (!xt_ind_read_bytes(ot, ind, wrote_pos, sizeof(XTIndFreeBlockRec), (xtWord1 *) &free_block))
287
			goto failed;
288
		XT_NODE_ID(tab->tab_ind_free) = (xtIndexNodeID) XT_GET_DISK_8(free_block.if_next_block_8);
289
		xt_unlock_mutex_ns(&tab->tab_ind_lock);
290
		*address = wrote_pos;
291
		TRACK_BLOCK_ALLOC(wrote_pos);
292
		return OK;
293
	}
294
295
	/* PMC - Dont allow overflow! */
296
	if (XT_NODE_ID(tab->tab_ind_eof) >= 0xFFFFFFF) {
297
		xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_INDEX_FILE_TO_LARGE, xt_file_path(ot->ot_ind_file));
298
		goto failed;
299
	}
300
	*address = tab->tab_ind_eof;
301
	XT_NODE_ID(tab->tab_ind_eof)++;
302
	xt_unlock_mutex_ns(&tab->tab_ind_lock);
303
	TRACK_BLOCK_ALLOC(*address);
304
	return OK;
305
306
	failed:
307
	xt_unlock_mutex_ns(&tab->tab_ind_lock);
308
	return FAILED;
309
}
310
311
/* Add the block to the private free list of the index.
312
 * On flush, this list will be transfered to the global list.
313
 */
314
static xtBool idx_free_branch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID node_id)
315
{
316
	register u_int		count;
317
	register u_int		i;
318
	register u_int		guess;
319
320
	TRACK_BLOCK_FREE(node_id);
321
	//ASSERT_NS(XT_INDEX_HAVE_XLOCK(ind, ot));
322
	if (!ind->mi_free_list) {
323
		count = 0;
324
		if (!(ind->mi_free_list = (XTIndFreeListPtr) xt_calloc_ns(offsetof(XTIndFreeListRec, fl_page_id) + 10 * sizeof(xtIndexNodeID))))
325
			return FAILED;
326
	}
327
	else {
328
		count = ind->mi_free_list->fl_free_count;
329
		if (!xt_realloc_ns((void **) &ind->mi_free_list, offsetof(XTIndFreeListRec, fl_page_id) + (count + 1) * sizeof(xtIndexNodeID)))
330
			return FAILED;
331
	}
332
 
333
	i = 0;
334
	while (i < count) {
335
		guess = (i + count - 1) >> 1;
336
		if (XT_NODE_ID(node_id) == XT_NODE_ID(ind->mi_free_list->fl_page_id[guess])) {
337
			// Should not happen...
338
			ASSERT_NS(FALSE);
339
			return OK;
340
		}
341
		if (XT_NODE_ID(node_id) < XT_NODE_ID(ind->mi_free_list->fl_page_id[guess]))
342
			count = guess;
343
		else
344
			i = guess + 1;
345
	}
346
347
	/* Insert at position i */
348
	memmove(ind->mi_free_list->fl_page_id + i + 1, ind->mi_free_list->fl_page_id + i, (ind->mi_free_list->fl_free_count - i) * sizeof(xtIndexNodeID));
349
	ind->mi_free_list->fl_page_id[i] = node_id;
350
	ind->mi_free_list->fl_free_count++;
351
352
	/* Set the cache page to clean: */
353
	return xt_ind_free_block(ot, ind, node_id);
354
}
355
356
/*
357
 * -----------------------------------------------------------------------
358
 * Simple compare functions
359
 */
360
361
xtPublic int xt_compare_2_int4(XTIndexPtr XT_UNUSED(ind), uint key_length, xtWord1 *key_value, xtWord1 *b_value)
362
{
363
	int r;
364
365
	ASSERT_NS(key_length == 4 || key_length == 8);
366
	r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
367
	if (r == 0 && key_length > 4) {
368
		key_value += 4;
369
		b_value += 4;
370
		r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
371
	}
372
	return r;
373
}
374
375
xtPublic int xt_compare_3_int4(XTIndexPtr XT_UNUSED(ind), uint key_length, xtWord1 *key_value, xtWord1 *b_value)
376
{
377
	int r;
378
379
	ASSERT_NS(key_length == 4 || key_length == 8 || key_length == 12);
380
	r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
381
	if (r == 0 && key_length > 4) {
382
		key_value += 4;
383
		b_value += 4;
384
		r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
385
		if (r == 0 && key_length > 8) {
386
			key_value += 4;
387
			b_value += 4;
388
			r = (xtInt4) XT_GET_DISK_4(key_value) - (xtInt4) XT_GET_DISK_4(b_value);
389
		}
390
	}
391
	return r;
392
}
393
394
/*
395
 * -----------------------------------------------------------------------
396
 * Tree branch sanning (searching nodes and leaves)
397
 */
398
399
xtPublic void xt_scan_branch_single(struct XTTable *XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxKeyValuePtr value, register XTIdxResultRec *result)
400
{
401
	XT_NODE_TEMP;
402
	u_int				branch_size;
403
	u_int				node_ref_size;
404
	u_int				full_item_size;
405
	int					search_flags;
406
	register xtWord1	*base;
407
	register u_int		i;
408
	register xtWord1	*bitem;
409
	u_int				total_count;
410
411
	branch_size = XT_GET_DISK_2(branch->tb_size_2);
412
	node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
413
414
	result->sr_found = FALSE;
415
	result->sr_duplicate = FALSE;
416
	result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
417
	ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
418
419
	result->sr_item.i_item_size = ind->mi_key_size + XT_RECORD_REF_SIZE;
420
	full_item_size = result->sr_item.i_item_size + node_ref_size;
421
	result->sr_item.i_node_ref_size = node_ref_size;
422
423
	search_flags = value->sv_flags;
424
	base = branch->tb_data + node_ref_size;
425
	total_count = (result->sr_item.i_total_size - node_ref_size) / full_item_size;
426
	if (search_flags & XT_SEARCH_FIRST_FLAG)
427
		i = 0;
428
	else if (search_flags & XT_SEARCH_AFTER_LAST_FLAG)
429
		i = total_count;
430
	else {
431
		register u_int		guess;
432
		register u_int		count;
433
		register xtInt4		r;
434
		xtRecordID			key_record;
435
436
		key_record = value->sv_rec_id;
437
		count = total_count;
438
439
		ASSERT_NS(ind);
440
		i = 0;
441
		while (i < count) {
442
			guess = (i + count - 1) >> 1;
443
444
			bitem = base + guess * full_item_size;
445
446
			switch (ind->mi_single_type) {
447
				case HA_KEYTYPE_LONG_INT: {
448
					register xtInt4 a, b;
449
					
450
					a = XT_GET_DISK_4(value->sv_key);
451
					b = XT_GET_DISK_4(bitem);
452
					r = (a < b) ? -1 : (a == b ? 0 : 1);
453
					break;
454
				}
455
				case HA_KEYTYPE_ULONG_INT: {
456
					register xtWord4 a, b;
457
					
458
					a = XT_GET_DISK_4(value->sv_key);
459
					b = XT_GET_DISK_4(bitem);
460
					r = (a < b) ? -1 : (a == b ? 0 : 1);
461
					break;
462
				}
463
				default:
464
					/* Should not happen: */
465
					r = 1;
466
					break;
467
			}
468
			if (r == 0) {
469
				if (search_flags & XT_SEARCH_WHOLE_KEY) {
470
					xtRecordID	item_record;
471
					xtRowID		row_id;
472
					
473
					xt_get_record_ref(bitem + ind->mi_key_size, &item_record, &row_id);
474
475
					/* This should not happen because we should never
476
					 * try to insert the same record twice into the 
477
					 * index!
478
					 */
479
					result->sr_duplicate = TRUE;
480
					if (key_record == item_record) {
481
						result->sr_found = TRUE;
482
						result->sr_rec_id = item_record;
483
						result->sr_row_id = row_id;
484
						result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);
485
						result->sr_item.i_item_offset = node_ref_size + guess * full_item_size;
486
						return;
487
					}
488
					if (key_record < item_record)
489
						r = -1;
490
					else
491
						r = 1;
492
				}
493
				else {
494
					result->sr_found = TRUE;
495
					/* -1 causes a search to the beginning of the duplicate list of keys.
496
					 * 1 causes a search to just after the key.
497
				 	*/
498
					if (search_flags & XT_SEARCH_AFTER_KEY)
499
						r = 1;
500
					else
501
						r = -1;
502
				}
503
			}
504
505
			if (r < 0)
506
				count = guess;
507
			else
508
				i = guess + 1;
509
		}
510
	}
511
512
	bitem = base + i * full_item_size;
513
	xt_get_res_record_ref(bitem + ind->mi_key_size, result);
514
	result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);			/* Only valid if this is a node. */
515
	result->sr_item.i_item_offset = node_ref_size + i * full_item_size;
516
#ifdef IND_SKEW_SPLIT_ON_APPEND
517
	if (i != total_count)
518
		result->sr_last_item = FALSE;
519
#endif
520
}
521
522
/*
523
 * We use a special binary search here. It basically assumes that the values
524
 * in the index are not unique.
525
 *
526
 * Even if they are unique, when we search for part of a key, then it is
527
 * effectively the case.
528
 *
529
 * So in the situation where we find duplicates in the index we usually
530
 * want to position ourselves at the beginning of the duplicate list.
531
 *
532
 * Alternatively a search can find the position just after a given key.
533
 *
534
 * To achieve this we make the following modifications:
535
 * - The result of the comparison is always returns 1 or -1. We only stop
536
 *   the search early in the case an exact match when inserting (but this
537
 *   should not happen anyway).
538
 * - The search never actually fails, but sets 'found' to TRUE if it
539
 *   sees the search key in the index.
540
 *
541
 * If the search value exists in the index we know that
542
 * this method will take us to the first occurrence of the key in the
543
 * index (in the case of -1) or to the first value after the
544
 * the search key in the case of 1.
545
 */
546
xtPublic void xt_scan_branch_fix(struct XTTable *XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxKeyValuePtr value, register XTIdxResultRec *result)
547
{
548
	XT_NODE_TEMP;
549
	u_int				branch_size;
550
	u_int				node_ref_size;
551
	u_int				full_item_size;
552
	int					search_flags;
553
	xtWord1				*base;
554
	register u_int		i;
555
	xtWord1				*bitem;
556
	u_int				total_count;
557
558
	branch_size = XT_GET_DISK_2(branch->tb_size_2);
559
	node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
560
561
	result->sr_found = FALSE;
562
	result->sr_duplicate = FALSE;
563
	result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
564
	ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
565
566
	result->sr_item.i_item_size = ind->mi_key_size + XT_RECORD_REF_SIZE;
567
	full_item_size = result->sr_item.i_item_size + node_ref_size;
568
	result->sr_item.i_node_ref_size = node_ref_size;
569
570
	search_flags = value->sv_flags;
571
	base = branch->tb_data + node_ref_size;
572
	total_count = (result->sr_item.i_total_size - node_ref_size) / full_item_size;
573
	if (search_flags & XT_SEARCH_FIRST_FLAG)
574
		i = 0;
575
	else if (search_flags & XT_SEARCH_AFTER_LAST_FLAG)
576
		i = total_count;
577
	else {
578
		register u_int		guess;
579
		register u_int		count;
580
		xtRecordID			key_record;
581
		int					r;
582
583
		key_record = value->sv_rec_id;
584
		count = total_count;
585
586
		ASSERT_NS(ind);
587
		i = 0;
588
		while (i < count) {
589
			guess = (i + count - 1) >> 1;
590
591
			bitem = base + guess * full_item_size;
592
593
			r = myxt_compare_key(ind, search_flags, value->sv_length, value->sv_key, bitem);
594
595
			if (r == 0) {
596
				if (search_flags & XT_SEARCH_WHOLE_KEY) {
597
					xtRecordID	item_record;
598
					xtRowID		row_id;
599
600
					xt_get_record_ref(bitem + ind->mi_key_size, &item_record, &row_id);
601
602
					/* This should not happen because we should never
603
					 * try to insert the same record twice into the 
604
					 * index!
605
					 */
606
					result->sr_duplicate = TRUE;
607
					if (key_record == item_record) {
608
						result->sr_found = TRUE;
609
						result->sr_rec_id = item_record;
610
						result->sr_row_id = row_id;
611
						result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);
612
						result->sr_item.i_item_offset = node_ref_size + guess * full_item_size;
613
						return;
614
					}
615
					if (key_record < item_record)
616
						r = -1;
617
					else
618
						r = 1;
619
				}
620
				else {
621
					result->sr_found = TRUE;
622
					/* -1 causes a search to the beginning of the duplicate list of keys.
623
					 * 1 causes a search to just after the key.
624
				 	*/
625
					if (search_flags & XT_SEARCH_AFTER_KEY)
626
						r = 1;
627
					else
628
						r = -1;
629
				}
630
			}
631
632
			if (r < 0)
633
				count = guess;
634
			else
635
				i = guess + 1;
636
		}
637
	}
638
639
	bitem = base + i * full_item_size;
640
	xt_get_res_record_ref(bitem + ind->mi_key_size, result);
641
	result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);			/* Only valid if this is a node. */
642
	result->sr_item.i_item_offset = node_ref_size + i * full_item_size;
643
#ifdef IND_SKEW_SPLIT_ON_APPEND
644
	if (i != total_count)
645
		result->sr_last_item = FALSE;
646
#endif
647
}
648
649
xtPublic void xt_scan_branch_fix_simple(struct XTTable *XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxKeyValuePtr value, register XTIdxResultRec *result)
650
{
651
	XT_NODE_TEMP;
652
	u_int				branch_size;
653
	u_int				node_ref_size;
654
	u_int				full_item_size;
655
	int					search_flags;
656
	xtWord1				*base;
657
	register u_int		i;
658
	xtWord1				*bitem;
659
	u_int				total_count;
660
661
	branch_size = XT_GET_DISK_2(branch->tb_size_2);
662
	node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
663
664
	result->sr_found = FALSE;
665
	result->sr_duplicate = FALSE;
666
	result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
667
	ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
668
669
	result->sr_item.i_item_size = ind->mi_key_size + XT_RECORD_REF_SIZE;
670
	full_item_size = result->sr_item.i_item_size + node_ref_size;
671
	result->sr_item.i_node_ref_size = node_ref_size;
672
673
	search_flags = value->sv_flags;
674
	base = branch->tb_data + node_ref_size;
675
	total_count = (result->sr_item.i_total_size - node_ref_size) / full_item_size;
676
	if (search_flags & XT_SEARCH_FIRST_FLAG)
677
		i = 0;
678
	else if (search_flags & XT_SEARCH_AFTER_LAST_FLAG)
679
		i = total_count;
680
	else {
681
		register u_int		guess;
682
		register u_int		count;
683
		xtRecordID			key_record;
684
		int					r;
685
686
		key_record = value->sv_rec_id;
687
		count = total_count;
688
689
		ASSERT_NS(ind);
690
		i = 0;
691
		while (i < count) {
692
			guess = (i + count - 1) >> 1;
693
694
			bitem = base + guess * full_item_size;
695
696
			r = ind->mi_simple_comp_key(ind, value->sv_length, value->sv_key, bitem);
697
698
			if (r == 0) {
699
				if (search_flags & XT_SEARCH_WHOLE_KEY) {
700
					xtRecordID	item_record;
701
					xtRowID		row_id;
702
703
					xt_get_record_ref(bitem + ind->mi_key_size, &item_record, &row_id);
704
705
					/* This should not happen because we should never
706
					 * try to insert the same record twice into the 
707
					 * index!
708
					 */
709
					result->sr_duplicate = TRUE;
710
					if (key_record == item_record) {
711
						result->sr_found = TRUE;
712
						result->sr_rec_id = item_record;
713
						result->sr_row_id = row_id;
714
						result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);
715
						result->sr_item.i_item_offset = node_ref_size + guess * full_item_size;
716
						return;
717
					}
718
					if (key_record < item_record)
719
						r = -1;
720
					else
721
						r = 1;
722
				}
723
				else {
724
					result->sr_found = TRUE;
725
					/* -1 causes a search to the beginning of the duplicate list of keys.
726
					 * 1 causes a search to just after the key.
727
				 	*/
728
					if (search_flags & XT_SEARCH_AFTER_KEY)
729
						r = 1;
730
					else
731
						r = -1;
732
				}
733
			}
734
735
			if (r < 0)
736
				count = guess;
737
			else
738
				i = guess + 1;
739
		}
740
	}
741
742
	bitem = base + i * full_item_size;
743
	xt_get_res_record_ref(bitem + ind->mi_key_size, result);
744
	result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);			/* Only valid if this is a node. */
745
	result->sr_item.i_item_offset = node_ref_size + i * full_item_size;
746
#ifdef IND_SKEW_SPLIT_ON_APPEND
747
	if (i != total_count)
748
		result->sr_last_item = FALSE;
749
#endif
750
}
751
752
/*
753
 * Variable length key values are stored as a sorted list. Since each list item has a variable length, we
754
 * must scan the list sequentially in order to find a key.
755
 */
756
xtPublic void xt_scan_branch_var(struct XTTable *XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxKeyValuePtr value, register XTIdxResultRec *result)
757
{
758
	XT_NODE_TEMP;
759
	u_int			branch_size;
760
	u_int			node_ref_size;
761
	int				search_flags;
762
	xtWord1			*base;
763
	xtWord1			*bitem;
764
	u_int			ilen;
765
	xtWord1			*bend;
766
767
	branch_size = XT_GET_DISK_2(branch->tb_size_2);
768
	node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
769
770
	result->sr_found = FALSE;
771
	result->sr_duplicate = FALSE;
772
	result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
773
	ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
774
775
	result->sr_item.i_node_ref_size = node_ref_size;
776
777
	search_flags = value->sv_flags;
778
	base = branch->tb_data + node_ref_size;
779
	bitem = base;
780
	bend = &branch->tb_data[result->sr_item.i_total_size];
781
	ilen = 0;
782
	if (bitem >= bend)
783
		goto done_ok;
784
785
	if (search_flags & XT_SEARCH_FIRST_FLAG)
786
		ilen = myxt_get_key_length(ind, bitem);
787
	else if (search_flags & XT_SEARCH_AFTER_LAST_FLAG) {
788
		bitem = bend;
789
		ilen = 0;
790
	}
791
	else {
792
		xtRecordID	key_record;
793
		int			r;
794
795
		key_record = value->sv_rec_id;
796
797
		ASSERT_NS(ind);
798
		while (bitem < bend) {
799
			ilen = myxt_get_key_length(ind, bitem);
800
			r = myxt_compare_key(ind, search_flags, value->sv_length, value->sv_key, bitem);
801
			if (r == 0) {
802
				if (search_flags & XT_SEARCH_WHOLE_KEY) {
803
					xtRecordID	item_record;
804
					xtRowID		row_id;
805
806
					xt_get_record_ref(bitem + ilen, &item_record, &row_id);
807
808
					/* This should not happen because we should never
809
					 * try to insert the same record twice into the 
810
					 * index!
811
					 */
812
					result->sr_duplicate = TRUE;
813
					if (key_record == item_record) {
814
						result->sr_found = TRUE;
815
						result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
816
						result->sr_rec_id = item_record;
817
						result->sr_row_id = row_id;
818
						result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);
819
						result->sr_item.i_item_offset = bitem - branch->tb_data;
820
						return;
821
					}
822
					if (key_record < item_record)
823
						r = -1;
824
					else
825
						r = 1;
826
				}
827
				else {
828
					result->sr_found = TRUE;
829
					/* -1 causes a search to the beginning of the duplicate list of keys.
830
					 * 1 causes a search to just after the key.
831
				 	*/
832
					if (search_flags & XT_SEARCH_AFTER_KEY)
833
						r = 1;
834
					else
835
						r = -1;
836
				}
837
			}
838
			if (r <= 0)
839
				break;
840
			bitem += ilen + XT_RECORD_REF_SIZE + node_ref_size;
841
		}
842
	}
843
844
	done_ok:
845
	result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
846
	xt_get_res_record_ref(bitem + ilen, result);
847
	result->sr_branch = IDX_GET_NODE_REF(tab, bitem, node_ref_size);			/* Only valid if this is a node. */
848
	result->sr_item.i_item_offset = bitem - branch->tb_data;
849
#ifdef IND_SKEW_SPLIT_ON_APPEND
850
	if (bitem != bend)
851
		result->sr_last_item = FALSE;
852
#endif
853
}
854
855
/* Go to the next item in the node. */
856
static void idx_next_branch_item(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultRec *result)
857
{
858
	XT_NODE_TEMP;
859
	xtWord1	*bitem;
860
	u_int	ilen;
861
862
	result->sr_item.i_item_offset += result->sr_item.i_item_size + result->sr_item.i_node_ref_size;
863
	bitem = branch->tb_data + result->sr_item.i_item_offset;
864
	if (result->sr_item.i_item_offset < result->sr_item.i_total_size) {
865
		if (ind->mi_fix_key)
866
			ilen = result->sr_item.i_item_size;
867
		else {
868
			ilen = myxt_get_key_length(ind, bitem) + XT_RECORD_REF_SIZE;
869
			result->sr_item.i_item_size = ilen;
870
		}
871
		xt_get_res_record_ref(bitem + ilen - XT_RECORD_REF_SIZE, result); /* (Only valid if i_item_offset < i_total_size) */
872
	}
873
	else {
874
		result->sr_item.i_item_size = 0;
875
		result->sr_rec_id = 0;
876
		result->sr_row_id = 0;
877
	}
878
	if (result->sr_item.i_node_ref_size)
879
		/* IDX_GET_NODE_REF() loads the branch reference to the LEFT of the item. */
880
		result->sr_branch = IDX_GET_NODE_REF(tab, bitem, result->sr_item.i_node_ref_size);
881
	else
882
		result->sr_branch = 0;
883
}
884
885
xtPublic void xt_prev_branch_item_fix(XTTableHPtr XT_UNUSED(tab), XTIndexPtr XT_UNUSED(ind), XTIdxBranchDPtr branch, register XTIdxResultRec *result)
886
{
887
	XT_NODE_TEMP;
888
	ASSERT_NS(result->sr_item.i_item_offset >= result->sr_item.i_item_size + result->sr_item.i_node_ref_size + result->sr_item.i_node_ref_size);
889
	result->sr_item.i_item_offset -= (result->sr_item.i_item_size + result->sr_item.i_node_ref_size);
890
	xt_get_res_record_ref(branch->tb_data + result->sr_item.i_item_offset + result->sr_item.i_item_size - XT_RECORD_REF_SIZE, result); /* (Only valid if i_item_offset < i_total_size) */
891
	result->sr_branch = IDX_GET_NODE_REF(tab, branch->tb_data + result->sr_item.i_item_offset, result->sr_item.i_node_ref_size);
892
}
893
894
xtPublic void xt_prev_branch_item_var(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultRec *result)
895
{
896
	XT_NODE_TEMP;
897
	xtWord1	*bitem;
898
	xtWord1	*bend;
899
	u_int	ilen;
900
901
	bitem = branch->tb_data + result->sr_item.i_node_ref_size;
902
	bend = &branch->tb_data[result->sr_item.i_item_offset];
903
	for (;;) {
904
		ilen = myxt_get_key_length(ind, bitem);
905
		if (bitem + ilen + XT_RECORD_REF_SIZE + result->sr_item.i_node_ref_size >= bend)
906
			break;
907
		bitem += ilen + XT_RECORD_REF_SIZE + result->sr_item.i_node_ref_size;
908
	}
909
910
	result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
911
	xt_get_res_record_ref(bitem + ilen, result); /* (Only valid if i_item_offset < i_total_size) */
912
	result->sr_branch = IDX_GET_NODE_REF(tab, bitem, result->sr_item.i_node_ref_size);
913
	result->sr_item.i_item_offset = bitem - branch->tb_data;
914
}
915
916
static void idx_reload_item_fix(XTIndexPtr XT_NDEBUG_UNUSED(ind), XTIdxBranchDPtr branch, register XTIdxResultPtr result)
917
{
918
	u_int branch_size;
919
920
	branch_size = XT_GET_DISK_2(branch->tb_size_2);
921
	ASSERT_NS(result->sr_item.i_node_ref_size == (XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0));
922
	ASSERT_NS(result->sr_item.i_item_size == ind->mi_key_size + XT_RECORD_REF_SIZE);
923
	result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
924
	if (result->sr_item.i_item_offset > result->sr_item.i_total_size)
925
		result->sr_item.i_item_offset = result->sr_item.i_total_size;
926
	xt_get_res_record_ref(&branch->tb_data[result->sr_item.i_item_offset + result->sr_item.i_item_size - XT_RECORD_REF_SIZE], result); 
927
}
928
929
static void idx_first_branch_item(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultPtr result)
930
{
931
	XT_NODE_TEMP;
932
	u_int branch_size;
933
	u_int node_ref_size;
934
	u_int key_data_size;
935
936
	branch_size = XT_GET_DISK_2(branch->tb_size_2);
937
	node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
938
939
	result->sr_found = FALSE;
940
	result->sr_duplicate = FALSE;
941
	result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
942
	ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
943
944
	if (ind->mi_fix_key)
945
		key_data_size = ind->mi_key_size;
946
	else {
947
		xtWord1 *bitem;
948
949
		bitem = branch->tb_data + node_ref_size;
950
		if (bitem < &branch->tb_data[result->sr_item.i_total_size])
951
			key_data_size = myxt_get_key_length(ind, bitem);
952
		else
953
			key_data_size = 0;
954
	}
955
956
	result->sr_item.i_item_size = key_data_size + XT_RECORD_REF_SIZE;
957
	result->sr_item.i_node_ref_size = node_ref_size;
958
959
	xt_get_res_record_ref(branch->tb_data + node_ref_size + key_data_size, result);
960
	result->sr_branch = IDX_GET_NODE_REF(tab, branch->tb_data + node_ref_size, node_ref_size); /* Only valid if this is a node. */
961
	result->sr_item.i_item_offset = node_ref_size;
962
}
963
964
/*
965
 * Last means different things for leaf or node!
966
 */
967
xtPublic void xt_last_branch_item_fix(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultPtr result)
968
{
969
	XT_NODE_TEMP;
970
	u_int branch_size;
971
	u_int node_ref_size;
972
973
	branch_size = XT_GET_DISK_2(branch->tb_size_2);
974
	node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
975
976
	result->sr_found = FALSE;
977
	result->sr_duplicate = FALSE;
978
	result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
979
	ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
980
981
	result->sr_item.i_item_size = ind->mi_key_size + XT_RECORD_REF_SIZE;
982
	result->sr_item.i_node_ref_size = node_ref_size;
983
984
	if (node_ref_size) {
985
		result->sr_item.i_item_offset = result->sr_item.i_total_size;
986
		result->sr_branch = IDX_GET_NODE_REF(tab, branch->tb_data + result->sr_item.i_item_offset, node_ref_size);
987
	}
988
	else {
989
		if (result->sr_item.i_total_size) {
990
			result->sr_item.i_item_offset = result->sr_item.i_total_size - result->sr_item.i_item_size;
991
			xt_get_res_record_ref(branch->tb_data + result->sr_item.i_item_offset + ind->mi_key_size, result);
992
		}
993
		else
994
			/* Leaf is empty: */
995
			result->sr_item.i_item_offset = 0;
996
	}
997
}
998
999
xtPublic void xt_last_branch_item_var(XTTableHPtr XT_UNUSED(tab), XTIndexPtr ind, XTIdxBranchDPtr branch, register XTIdxResultPtr result)
1000
{
1001
	XT_NODE_TEMP;
1002
	u_int	branch_size;
1003
	u_int	node_ref_size;
1004
1005
	branch_size = XT_GET_DISK_2(branch->tb_size_2);
1006
	node_ref_size = XT_IS_NODE(branch_size) ? XT_NODE_REF_SIZE : 0;
1007
1008
	result->sr_found = FALSE;
1009
	result->sr_duplicate = FALSE;
1010
	result->sr_item.i_total_size = XT_GET_BRANCH_DATA_SIZE(branch_size);
1011
	ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE-2);
1012
1013
	result->sr_item.i_node_ref_size = node_ref_size;
1014
1015
	if (node_ref_size) {
1016
		result->sr_item.i_item_offset = result->sr_item.i_total_size;
1017
		result->sr_branch = IDX_GET_NODE_REF(tab, branch->tb_data + result->sr_item.i_item_offset, node_ref_size);
1018
		result->sr_item.i_item_size = 0;
1019
	}
1020
	else {
1021
		if (result->sr_item.i_total_size) {
1022
			xtWord1	*bitem;
1023
			u_int	ilen;
1024
			xtWord1	*bend;
1025
1026
			bitem = branch->tb_data + node_ref_size;;
1027
			bend = &branch->tb_data[result->sr_item.i_total_size];
1028
			ilen = 0;
1029
			if (bitem < bend) {
1030
				for (;;) {
1031
					ilen = myxt_get_key_length(ind, bitem);
1032
					if (bitem + ilen + XT_RECORD_REF_SIZE + node_ref_size >= bend)
1033
						break;
1034
					bitem += ilen + XT_RECORD_REF_SIZE + node_ref_size;
1035
				}
1036
			}
1037
1038
			result->sr_item.i_item_offset = bitem - branch->tb_data;
1039
			xt_get_res_record_ref(bitem + ilen, result);
1040
			result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
1041
		}
1042
		else {
1043
			/* Leaf is empty: */
1044
			result->sr_item.i_item_offset = 0;
1045
			result->sr_item.i_item_size = 0;
1046
		}
1047
	}
1048
}
1049
1050
xtPublic xtBool xt_idx_lazy_delete_on_leaf(XTIndexPtr ind, XTIndBlockPtr block, xtWord2 branch_size)
1051
{
1052
	ASSERT_NS(ind->mi_fix_key);
1053
	
1054
	/* Compact the leaf if more than half the items that fit on the page
1055
	 * are deleted: */
1056
	if (block->cp_del_count >= ind->mi_max_items/2)
1057
		return FALSE;
1058
1059
	/* Compact the page if there is only 1 (or less) valid item left: */
1060
	if ((u_int) block->cp_del_count+1 >= ((u_int) branch_size - 2)/(ind->mi_key_size + XT_RECORD_REF_SIZE))
1061
		return FALSE;
1062
1063
	return OK;
1064
}
1065
1066
static xtBool idx_lazy_delete_on_node(XTIndexPtr ind, XTIndBlockPtr block, register XTIdxItemPtr item)
1067
{
1068
	ASSERT_NS(ind->mi_fix_key);
1069
	
1070
	/* Compact the node if more than 1/4 of the items that fit on the page
1071
	 * are deleted: */
1072
	if (block->cp_del_count >= ind->mi_max_items/4)
1073
		return FALSE;
1074
1075
	/* Compact the page if there is only 1 (or less) valid item left: */
1076
	if ((u_int) block->cp_del_count+1 >= (item->i_total_size - item->i_node_ref_size)/(item->i_item_size + item->i_node_ref_size))
1077
		return FALSE;
1078
1079
	return OK;
1080
}
1081
1082
inline static xtBool idx_cmp_item_key_fix(XTIndReferencePtr iref, register XTIdxItemPtr item, XTIdxKeyValuePtr value)
1083
{
1084
	xtWord1 *data;
1085
1086
	data = &iref->ir_branch->tb_data[item->i_item_offset];
1087
	return memcmp(data, value->sv_key, value->sv_length) == 0;
1088
}
1089
1090
inline static void idx_set_item_key_fix(XTIndReferencePtr iref, register XTIdxItemPtr item, XTIdxKeyValuePtr value)
1091
{
1092
	xtWord1 *data;
1093
1094
	data = &iref->ir_branch->tb_data[item->i_item_offset];
1095
	memcpy(data, value->sv_key, value->sv_length);
1096
	xt_set_val_record_ref(data + value->sv_length, value);
1097
#ifdef IND_OPT_DATA_WRITTEN
1098
	if (item->i_item_offset < iref->ir_block->cb_min_pos)
1099
		iref->ir_block->cb_min_pos = item->i_item_offset;
1100
	if (item->i_item_offset + value->sv_length > iref->ir_block->cb_max_pos)
1101
		iref->ir_block->cb_max_pos = item->i_item_offset + value->sv_length;
1102
	ASSERT_NS(iref->ir_block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1103
	ASSERT_NS(iref->ir_block->cb_min_pos <= iref->ir_block->cb_max_pos);
1104
#endif
1105
	iref->ir_updated = TRUE;
1106
}
1107
1108
static xtBool idx_set_item_row_id(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item, xtRowID row_id)
1109
{
1110
	register XTIndBlockPtr	block = iref->ir_block;
1111
	size_t					offset;
1112
	xtWord1					*data;
1113
1114
	if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1115
		ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1116
		if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1117
			xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1118
			return FAILED;
1119
		}
1120
	}
1121
1122
	offset = 
1123
		/* This is the offset of the reference in the item we found: */
1124
		item->i_item_offset +item->i_item_size - XT_RECORD_REF_SIZE +
1125
		/* This is the offset of the row id in the reference: */
1126
		XT_RECORD_ID_SIZE;
1127
	data = &iref->ir_branch->tb_data[offset];
1128
1129
	/* This update does not change the structure of page, so we do it without
1130
	 * copying the page before we write.
1131
	 */
1132
	XT_SET_DISK_4(data, row_id);
1133
#ifdef IND_OPT_DATA_WRITTEN
1134
	if (offset < block->cb_min_pos)
1135
		block->cb_min_pos = offset;
1136
	if (offset + XT_ROW_ID_SIZE > block->cb_max_pos)
1137
		block->cb_max_pos = offset + XT_ROW_ID_SIZE;
1138
	ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1139
	ASSERT_NS(block->cb_min_pos <= iref->ir_block->cb_max_pos);
1140
#endif
1141
	iref->ir_updated = TRUE;
1142
	return OK;
1143
}
1144
1145
inline static xtBool idx_is_item_deleted(register XTIdxBranchDPtr branch, register XTIdxItemPtr item)
1146
{
1147
	xtWord1	*data;
1148
1149
	data = &branch->tb_data[item->i_item_offset + item->i_item_size - XT_RECORD_REF_SIZE + XT_RECORD_ID_SIZE];
1150
	return XT_GET_DISK_4(data) == (xtRowID) -1;
1151
}
1152
1153
static xtBool idx_set_item_deleted(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item)
1154
{
1155
	if (!idx_set_item_row_id(ot, ind, iref, item, (xtRowID) -1))
1156
		return FAILED;
1157
	
1158
	/* This should be safe because there is only one thread,
1159
	 * the sweeper, that does this!
1160
	 *
1161
	 * Threads that decrement this value have an xlock on
1162
	 * the page, or the index.
1163
	 */
1164
	iref->ir_block->cp_del_count++;
1165
	return OK;
1166
}
1167
1168
/*
1169
 * {LAZY-DEL-INDEX-ITEMS}
1170
 * Do a lazy delete of an item by just setting the Row ID
1171
 * to the delete indicator: row ID -1.
1172
 */
1173
static xtBool idx_lazy_delete_branch_item(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item)
1174
{
1175
	if (!idx_set_item_deleted(ot, ind, iref, item))
1176
		return FAILED;
1177
	xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_W_UPDATE : XT_UNLOCK_R_UPDATE, iref);
1178
	return OK;
1179
}
1180
1181
/*
1182
 * This function compacts the leaf, but preserves the
1183
 * position of the item.
1184
 */
1185
static xtBool idx_compact_leaf(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item)
1186
{
1187
	register XTIndBlockPtr		block = iref->ir_block;
1188
	register XTIdxBranchDPtr	branch = iref->ir_branch;
1189
	int		item_idx, count, i, idx;
1190
	u_int	size;
1191
	xtWord1	*s_data;
1192
	xtWord1	*d_data;
1193
	xtWord1	*data;
1194
	xtRowID	row_id;
1195
1196
	if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1197
		ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1198
		if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1199
			xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1200
			return FAILED;
1201
		}
1202
	}
1203
1204
	if (block->cb_handle_count) {
1205
		if (!xt_ind_copy_on_write(iref)) {
1206
			xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1207
			return FAILED;
1208
		}
1209
	}
1210
1211
	ASSERT_NS(!item->i_node_ref_size);
1212
	ASSERT_NS(ind->mi_fix_key);
1213
	size = item->i_item_size;
1214
	count = item->i_total_size / size;
1215
	item_idx = item->i_item_offset / size;
1216
	s_data = d_data = branch->tb_data;
1217
	idx = 0;
1218
	for (i=0; i<count; i++) {
1219
		data = s_data + item->i_item_size - XT_RECORD_REF_SIZE + XT_RECORD_ID_SIZE;
1220
		row_id = XT_GET_DISK_4(data);
1221
		if (row_id == (xtRowID) -1) {
1222
			if (idx < item_idx)
1223
				item_idx--;
1224
		}
1225
		else {
1226
			if (d_data != s_data)
1227
				memcpy(d_data, s_data, size);
1228
			d_data += size;
1229
			idx++;
1230
		}
1231
		s_data += size;
1232
	}
1233
	block->cp_del_count = 0;
1234
	item->i_total_size = d_data - branch->tb_data;
1235
	ASSERT_NS(idx * size == item->i_total_size);
1236
	item->i_item_offset = item_idx * size;
1237
	XT_SET_DISK_2(branch->tb_size_2, XT_MAKE_BRANCH_SIZE(item->i_total_size, 0));
1238
#ifdef IND_OPT_DATA_WRITTEN
1239
	block->cb_header = TRUE;
1240
	block->cb_min_pos = 0;
1241
	block->cb_max_pos = item->i_total_size;
1242
	ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1243
	ASSERT_NS(block->cb_min_pos <= iref->ir_block->cb_max_pos);
1244
#endif
1245
	iref->ir_updated = TRUE;
1246
	return OK;
1247
}
1248
1249
static xtBool idx_lazy_remove_leaf_item_right(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref, register XTIdxItemPtr item)
1250
{
1251
	register XTIndBlockPtr		block = iref->ir_block;
1252
	register XTIdxBranchDPtr	branch = iref->ir_branch;
1253
	int		item_idx, count, i;
1254
	u_int	size;
1255
	xtWord1	*s_data;
1256
	xtWord1	*d_data;
1257
	xtWord1	*data;
1258
	xtRowID	row_id;
1259
1260
	ASSERT_NS(!item->i_node_ref_size);
1261
1262
	if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1263
		ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1264
		if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1265
			xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1266
			return FAILED;
1267
		}
1268
	}
1269
1270
	if (block->cb_handle_count) {
1271
		if (!xt_ind_copy_on_write(iref)) {
1272
			xt_ind_release(ot, ind, XT_UNLOCK_WRITE, iref);
1273
			return FAILED;
1274
		}
1275
	}
1276
1277
	ASSERT_NS(ind->mi_fix_key);
1278
	size = item->i_item_size;
1279
	count = item->i_total_size / size;
1280
	item_idx = item->i_item_offset / size;
1281
	s_data = d_data = branch->tb_data;
1282
	for (i=0; i<count; i++) {
1283
		if (i == item_idx)
1284
			item->i_item_offset = d_data - branch->tb_data;
1285
		else {
1286
			data = s_data + item->i_item_size - XT_RECORD_REF_SIZE + XT_RECORD_ID_SIZE;
1287
			row_id = XT_GET_DISK_4(data);
1288
			if (row_id != (xtRowID) -1) {
1289
				if (d_data != s_data)
1290
					memcpy(d_data, s_data, size);
1291
				d_data += size;
1292
			}
1293
		}
1294
		s_data += size;
1295
	}
1296
	block->cp_del_count = 0;
1297
	item->i_total_size = d_data - branch->tb_data;
1298
	XT_SET_DISK_2(branch->tb_size_2, XT_MAKE_BRANCH_SIZE(item->i_total_size, 0));
1299
#ifdef IND_OPT_DATA_WRITTEN
1300
	block->cb_header = TRUE;
1301
	block->cb_min_pos = 0;
1302
	block->cb_max_pos = item->i_total_size;
1303
	ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1304
	ASSERT_NS(block->cb_min_pos <= iref->ir_block->cb_max_pos);
1305
#endif
1306
	iref->ir_updated = TRUE;
1307
	xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, iref);
1308
	return OK;
1309
}
1310
1311
/*
1312
 * Remove an item and save to disk.
1313
 */
1314
static xtBool idx_remove_branch_item_right(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID, XTIndReferencePtr iref, register XTIdxItemPtr item)
1315
{
1316
	register XTIndBlockPtr		block = iref->ir_block;
1317
	register XTIdxBranchDPtr	branch = iref->ir_branch;
1318
	u_int size = item->i_item_size + item->i_node_ref_size;
1319
1320
	if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1321
		ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1322
		if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1323
			xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1324
			return FAILED;
1325
		}
1326
	}
1327
1328
	/* {HANDLE-COUNT-USAGE}
1329
	 * This access is safe because we have the right to update
1330
	 * the page, so no other thread can modify the page.
1331
	 *
1332
	 * This means:
1333
	 * We either have an Xlock on the index, or we have
1334
	 * an Xlock on the cache block.
1335
	 */
1336
	if (block->cb_handle_count) {
1337
		if (!xt_ind_copy_on_write(iref)) {
1338
			xt_ind_release(ot, ind, item->i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, iref);
1339
			return FAILED;
1340
		}
1341
	}
1342
	if (ind->mi_lazy_delete) {
1343
		if (idx_is_item_deleted(branch, item))
1344
			block->cp_del_count--;
1345
	}
1346
	/* Remove the node reference to the left of the item: */
1347
	memmove(&branch->tb_data[item->i_item_offset],
1348
		&branch->tb_data[item->i_item_offset + size],
1349
		item->i_total_size - item->i_item_offset - size);
1350
	item->i_total_size -= size;
1351
	XT_SET_DISK_2(branch->tb_size_2, XT_MAKE_BRANCH_SIZE(item->i_total_size, item->i_node_ref_size));
1352
	IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(branch->tb_size_2));
1353
#ifdef IND_OPT_DATA_WRITTEN
1354
	block->cb_header = TRUE;
1355
	if (item->i_item_offset < block->cb_min_pos)
1356
		block->cb_min_pos = item->i_item_offset;
1357
	block->cb_max_pos = item->i_total_size;
1358
	ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1359
	ASSERT_NS(block->cb_min_pos <= block->cb_max_pos);
1360
#endif
1361
	iref->ir_updated = TRUE;
1362
	xt_ind_release(ot, ind, item->i_node_ref_size ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_W_UPDATE, iref);
1363
	return OK;
1364
}
1365
1366
static xtBool idx_remove_branch_item_left(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID, XTIndReferencePtr iref, register XTIdxItemPtr item, xtBool *lazy_delete_cleanup_required)
1367
{
1368
	register XTIndBlockPtr		block = iref->ir_block;
1369
	register XTIdxBranchDPtr	branch = iref->ir_branch;
1370
	u_int size = item->i_item_size + item->i_node_ref_size;
1371
1372
	if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1373
		ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1374
		if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1375
			xt_ind_release(ot, ind, iref->ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, iref);
1376
			return FAILED;
1377
		}
1378
	}
1379
1380
	ASSERT_NS(item->i_node_ref_size);
1381
	if (block->cb_handle_count) {
1382
		if (!xt_ind_copy_on_write(iref)) {
1383
			xt_ind_release(ot, ind, item->i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, iref);
1384
			return FAILED;
1385
		}
1386
	}
1387
	if (ind->mi_lazy_delete) {
1388
		if (idx_is_item_deleted(branch, item))
1389
			block->cp_del_count--;
1390
		if (lazy_delete_cleanup_required)
1391
			*lazy_delete_cleanup_required = idx_lazy_delete_on_node(ind, block, item);
1392
	}
1393
	/* Remove the node reference to the left of the item: */
1394
	memmove(&branch->tb_data[item->i_item_offset - item->i_node_ref_size],
1395
		&branch->tb_data[item->i_item_offset + item->i_item_size],
1396
		item->i_total_size - item->i_item_offset - item->i_item_size);
1397
	item->i_total_size -= size;
1398
	XT_SET_DISK_2(branch->tb_size_2, XT_MAKE_BRANCH_SIZE(item->i_total_size, item->i_node_ref_size));
1399
	IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(branch->tb_size_2));
1400
#ifdef IND_OPT_DATA_WRITTEN
1401
	block->cb_header = TRUE;
1402
	if (item->i_item_offset - item->i_node_ref_size < block->cb_min_pos)
1403
		block->cb_min_pos = item->i_item_offset - item->i_node_ref_size;
1404
	block->cb_max_pos = item->i_total_size;
1405
	ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1406
	ASSERT_NS(block->cb_min_pos <= block->cb_max_pos);
1407
#endif
1408
	iref->ir_updated = TRUE;
1409
	xt_ind_release(ot, ind, item->i_node_ref_size ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_W_UPDATE, iref);
1410
	return OK;
1411
}
1412
1413
static void idx_insert_leaf_item(XTIndexPtr XT_UNUSED(ind), XTIdxBranchDPtr leaf, XTIdxKeyValuePtr value, XTIdxResultPtr result)
1414
{
1415
	xtWord1 *item;
1416
1417
	/* This will ensure we do not overwrite the end of the buffer: */
1418
	ASSERT_NS(value->sv_length <= XT_INDEX_MAX_KEY_SIZE);
1419
	memmove(&leaf->tb_data[result->sr_item.i_item_offset + value->sv_length + XT_RECORD_REF_SIZE],
1420
		&leaf->tb_data[result->sr_item.i_item_offset],
1421
		result->sr_item.i_total_size - result->sr_item.i_item_offset);
1422
	item = &leaf->tb_data[result->sr_item.i_item_offset];
1423
	memcpy(item, value->sv_key, value->sv_length);
1424
	xt_set_val_record_ref(item + value->sv_length, value);
1425
	result->sr_item.i_total_size += value->sv_length + XT_RECORD_REF_SIZE;
1426
	XT_SET_DISK_2(leaf->tb_size_2, XT_MAKE_LEAF_SIZE(result->sr_item.i_total_size));
1427
}
1428
1429
static void idx_insert_node_item(XTTableHPtr XT_UNUSED(tab), XTIndexPtr XT_UNUSED(ind), XTIdxBranchDPtr leaf, XTIdxKeyValuePtr value, XTIdxResultPtr result, xtIndexNodeID branch)
1430
{
1431
	xtWord1 *item;
1432
1433
	/* This will ensure we do not overwrite the end of the buffer: */
1434
	ASSERT_NS(value->sv_length <= XT_INDEX_MAX_KEY_SIZE);
1435
	memmove(&leaf->tb_data[result->sr_item.i_item_offset + value->sv_length + XT_RECORD_REF_SIZE + result->sr_item.i_node_ref_size],
1436
		&leaf->tb_data[result->sr_item.i_item_offset],
1437
		result->sr_item.i_total_size - result->sr_item.i_item_offset);
1438
	item = &leaf->tb_data[result->sr_item.i_item_offset];
1439
	memcpy(item, value->sv_key, value->sv_length);
1440
	xt_set_val_record_ref(item + value->sv_length, value);
1441
	XT_SET_NODE_REF(tab, item + value->sv_length + XT_RECORD_REF_SIZE, branch);
1442
	result->sr_item.i_total_size += value->sv_length + XT_RECORD_REF_SIZE + result->sr_item.i_node_ref_size;
1443
	XT_SET_DISK_2(leaf->tb_size_2, XT_MAKE_NODE_SIZE(result->sr_item.i_total_size));
1444
}
1445
1446
static xtBool idx_get_middle_branch_item(XTOpenTablePtr ot, XTIndexPtr ind, XTIdxBranchDPtr branch, XTIdxKeyValuePtr value, XTIdxResultPtr result)
1447
{
1448
	xtWord1	*bitem;
1449
1450
	ASSERT_NS(result->sr_item.i_node_ref_size == 0 || result->sr_item.i_node_ref_size == XT_NODE_REF_SIZE);
1451
	ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE*2);
1452
	if (ind->mi_fix_key) {
1453
		u_int full_item_size = result->sr_item.i_item_size + result->sr_item.i_node_ref_size;
1454
1455
		result->sr_item.i_item_offset = ((result->sr_item.i_total_size - result->sr_item.i_node_ref_size)
1456
			/ full_item_size / 2 * full_item_size) + result->sr_item.i_node_ref_size;
1457
#ifdef IND_SKEW_SPLIT_ON_APPEND
1458
		if (result->sr_last_item) {
1459
			u_int offset;
1460
			
1461
			offset = result->sr_item.i_total_size - full_item_size * 2;
1462
			/* We actually split at the item before last! */
1463
			if (offset > result->sr_item.i_item_offset)
1464
				result->sr_item.i_item_offset = offset;
1465
		}
1466
#endif
1467
1468
		bitem = &branch->tb_data[result->sr_item.i_item_offset];
1469
		value->sv_flags = XT_SEARCH_WHOLE_KEY;
1470
		value->sv_length = result->sr_item.i_item_size - XT_RECORD_REF_SIZE;
1471
		xt_get_record_ref(bitem + value->sv_length, &value->sv_rec_id, &value->sv_row_id);
1472
		memcpy(value->sv_key, bitem, value->sv_length);
1473
	}
1474
	else {
1475
		u_int	node_ref_size;
1476
		u_int	ilen, tlen;
1477
		xtWord1	*bend;
1478
1479
		node_ref_size = result->sr_item.i_node_ref_size;
1480
		bitem = branch->tb_data + node_ref_size;
1481
		bend = &branch->tb_data[(result->sr_item.i_total_size - node_ref_size) / 2 + node_ref_size];
1482
#ifdef IND_SKEW_SPLIT_ON_APPEND
1483
		if (result->sr_last_item)
1484
			bend = &branch->tb_data[XT_INDEX_PAGE_DATA_SIZE];
1485
1486
		u_int	prev_ilen = 0;
1487
		xtWord1	*prev_bitem = NULL;
1488
#endif
1489
		ilen = 0;
1490
		if (bitem < bend) {
1491
			tlen = 0;
1492
			for (;;) {
1493
				ilen = myxt_get_key_length(ind, bitem);
1494
				tlen += ilen + XT_RECORD_REF_SIZE + node_ref_size;
1495
				if (bitem + ilen + XT_RECORD_REF_SIZE + node_ref_size >= bend) {
1496
					if (ilen > XT_INDEX_PAGE_SIZE || tlen > result->sr_item.i_total_size) {
1497
						xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
1498
						return FAILED;
1499
					}
1500
					break;
1501
				}
1502
#ifdef IND_SKEW_SPLIT_ON_APPEND
1503
				prev_ilen = ilen;
1504
				prev_bitem = bitem;
1505
#endif
1506
				bitem += ilen + XT_RECORD_REF_SIZE + node_ref_size;
1507
			}
1508
		}
1509
1510
#ifdef IND_SKEW_SPLIT_ON_APPEND
1511
		/* We actully want the item before last! */
1512
		if (result->sr_last_item && prev_bitem) {
1513
			bitem = prev_bitem;
1514
			ilen = prev_ilen;
1515
		}
1516
#endif
1517
		result->sr_item.i_item_offset = bitem - branch->tb_data;
1518
		result->sr_item.i_item_size = ilen + XT_RECORD_REF_SIZE;
1519
1520
		value->sv_flags = XT_SEARCH_WHOLE_KEY;
1521
		value->sv_length = ilen;
1522
		xt_get_record_ref(bitem + ilen, &value->sv_rec_id, &value->sv_row_id);
1523
		memcpy(value->sv_key, bitem, value->sv_length);
1524
	}
1525
	return OK;
1526
}
1527
1528
static size_t idx_write_branch_item(XTIndexPtr XT_UNUSED(ind), xtWord1 *item, XTIdxKeyValuePtr value)
1529
{
1530
	memcpy(item, value->sv_key, value->sv_length);
1531
	xt_set_val_record_ref(item + value->sv_length, value);
1532
	return value->sv_length + XT_RECORD_REF_SIZE;
1533
}
1534
1535
static xtBool idx_replace_node_key(XTOpenTablePtr ot, XTIndexPtr ind, IdxStackItemPtr item, IdxBranchStackPtr stack, u_int item_size, xtWord1 *item_buf)
1536
{
1537
	XTIndReferenceRec	iref;
1538
	xtIndexNodeID		new_branch;
1539
	XTIdxResultRec		result;
1540
	xtIndexNodeID		current = item->i_branch;
1541
	u_int				new_size;
1542
	XTIdxBranchDPtr		new_branch_ptr;
1543
	XTIdxKeyValueRec	key_value;
1544
	xtWord1				key_buf[XT_INDEX_MAX_KEY_SIZE];
1545
1546
#ifdef DEBUG
1547
	iref.ir_xlock = 2;
1548
	iref.ir_updated = 2;
1549
#endif
1550
	if (!xt_ind_fetch(ot, ind, current, XT_LOCK_WRITE, &iref))
1551
		return FAILED;
1552
1553
	if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1554
		ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1555
		if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
1556
			goto failed_1;
1557
	}
1558
1559
	if (iref.ir_block->cb_handle_count) {
1560
		if (!xt_ind_copy_on_write(&iref))
1561
			goto failed_1;
1562
	}
1563
1564
	if (ind->mi_lazy_delete) {
1565
		ASSERT_NS(item_size == item->i_pos.i_item_size);
1566
		if (idx_is_item_deleted(iref.ir_branch, &item->i_pos))
1567
			iref.ir_block->cp_del_count--;
1568
	}
1569
	memmove(&iref.ir_branch->tb_data[item->i_pos.i_item_offset + item_size],
1570
		&iref.ir_branch->tb_data[item->i_pos.i_item_offset + item->i_pos.i_item_size],
1571
		item->i_pos.i_total_size - item->i_pos.i_item_offset - item->i_pos.i_item_size);
1572
	memcpy(&iref.ir_branch->tb_data[item->i_pos.i_item_offset],
1573
		item_buf, item_size);
1574
	if (ind->mi_lazy_delete) {
1575
		if (idx_is_item_deleted(iref.ir_branch, &item->i_pos))
1576
			iref.ir_block->cp_del_count++;
1577
	}
1578
	item->i_pos.i_total_size = item->i_pos.i_total_size + item_size - item->i_pos.i_item_size;
1579
	XT_SET_DISK_2(iref.ir_branch->tb_size_2, XT_MAKE_NODE_SIZE(item->i_pos.i_total_size));
1580
	IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(iref.ir_branch->tb_size_2));
1581
#ifdef IND_OPT_DATA_WRITTEN
1582
	iref.ir_block->cb_header = TRUE;
1583
	if (item->i_pos.i_item_offset < iref.ir_block->cb_min_pos)
1584
		iref.ir_block->cb_min_pos = item->i_pos.i_item_offset;
1585
	iref.ir_block->cb_max_pos = item->i_pos.i_total_size;
1586
	ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
1587
#endif
1588
	iref.ir_updated = TRUE;
1589
1590
#ifdef DEBUG
1591
	if (ind->mi_lazy_delete)
1592
		ASSERT_NS(item->i_pos.i_total_size <= XT_INDEX_PAGE_DATA_SIZE);
1593
#endif
1594
	if (item->i_pos.i_total_size <= XT_INDEX_PAGE_DATA_SIZE)
1595
		return xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
1596
1597
	/* The node has overflowed!! */
1598
#ifdef IND_SKEW_SPLIT_ON_APPEND
1599
	result.sr_last_item = FALSE;
1600
#endif
1601
	result.sr_item = item->i_pos;
1602
1603
	/* Adjust the stack (we want the parents of the delete node): */
1604
	for (;;) {
1605
		if (idx_pop(stack) == item)
1606
			break;
1607
	}		
1608
1609
	/* We assume that value can be overwritten (which is the case) */
1610
	key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
1611
	key_value.sv_key = key_buf;
1612
	if (!idx_get_middle_branch_item(ot, ind, iref.ir_branch, &key_value, &result))
1613
		goto failed_1;
1614
1615
	if (!idx_new_branch(ot, ind, &new_branch))
1616
		goto failed_1;
1617
1618
	/* Split the node: */
1619
	new_size = result.sr_item.i_total_size - result.sr_item.i_item_offset - result.sr_item.i_item_size;
1620
	// TODO: Are 2 buffers now required?
1621
	new_branch_ptr = (XTIdxBranchDPtr) &ot->ot_ind_wbuf.tb_data[XT_INDEX_PAGE_DATA_SIZE];
1622
	memmove(new_branch_ptr->tb_data, &iref.ir_branch->tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size], new_size);
1623
1624
	XT_SET_DISK_2(new_branch_ptr->tb_size_2, XT_MAKE_NODE_SIZE(new_size));
1625
	IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(new_branch), (int) XT_GET_DISK_2(new_branch_ptr->tb_size_2));
1626
	if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + new_size, (xtWord1 *) new_branch_ptr))
1627
		goto failed_2;
1628
1629
	/* Change the size of the old branch: */
1630
	XT_SET_DISK_2(iref.ir_branch->tb_size_2, XT_MAKE_NODE_SIZE(result.sr_item.i_item_offset));
1631
	IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(iref.ir_branch->tb_size_2));
1632
#ifdef IND_OPT_DATA_WRITTEN
1633
	iref.ir_block->cb_header = TRUE;
1634
	if (result.sr_item.i_item_offset < iref.ir_block->cb_min_pos)
1635
		iref.ir_block->cb_min_pos = result.sr_item.i_item_offset;
1636
	iref.ir_block->cb_max_pos = result.sr_item.i_item_offset;
1637
	ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
1638
	ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
1639
#endif
1640
	iref.ir_updated = TRUE;
1641
	xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
1642
1643
	/* Insert the new branch into the parent node, using the new middle key value: */
1644
	if (!idx_insert_node(ot, ind, stack, FALSE, &key_value, new_branch)) {
1645
		/* 
1646
		 * TODO: Mark the index as corrupt.
1647
		 * This should not fail because everything has been
1648
		 * preallocated.
1649
		 * However, if it does fail the index
1650
		 * will be corrupt.
1651
		 * I could modify and release the branch above,
1652
		 * after this point.
1653
		 * But that would mean holding the lock longer,
1654
		 * and also may not help because idx_insert_node()
1655
		 * is recursive.
1656
		 */
1657
		idx_free_branch(ot, ind, new_branch);
1658
		return FAILED;
1659
	}
1660
1661
	return OK;
1662
1663
	failed_2:
1664
	idx_free_branch(ot, ind, new_branch);
1665
1666
	failed_1:
1667
	xt_ind_release(ot, ind, XT_UNLOCK_WRITE, &iref);
1668
1669
	return FAILED;
1670
}
1671
1672
/*ot_ind_wbuf
1673
 * -----------------------------------------------------------------------
1674
 * Standard b-tree insert
1675
 */
1676
1677
/*
1678
 * Insert the given branch into the node on the top of the stack. If the stack
1679
 * is empty we need to add a new root.
1680
 */
1681
static xtBool idx_insert_node(XTOpenTablePtr ot, XTIndexPtr ind, IdxBranchStackPtr stack, xtBool last_item, XTIdxKeyValuePtr key_value, xtIndexNodeID branch)
1682
{
1683
	IdxStackItemPtr		stack_item;
1684
	xtIndexNodeID		new_branch;
1685
	size_t				size;
1686
	xtIndexNodeID		current;
1687
	XTIndReferenceRec	iref;
1688
	XTIdxResultRec		result;
1689
	u_int				new_size;
1690
	XTIdxBranchDPtr		new_branch_ptr;
1691
#ifdef IND_OPT_DATA_WRITTEN
1692
	u_int				new_min_pos;
1693
#endif
1694
1695
#ifdef DEBUG
1696
	iref.ir_xlock = 2;
1697
	iref.ir_updated = 2;
1698
#endif
1699
	/* Insert a new branch (key, data)... */
1700
	if (!(stack_item = idx_pop(stack))) {
1701
		xtWord1 *ditem;
1702
1703
		/* New root */
1704
		if (!idx_new_branch(ot, ind, &new_branch))
1705
			goto failed;
1706
1707
		ditem = ot->ot_ind_wbuf.tb_data;
1708
		XT_SET_NODE_REF(ot->ot_table, ditem, ind->mi_root);
1709
		ditem += XT_NODE_REF_SIZE;
1710
		ditem += idx_write_branch_item(ind, ditem, key_value);
1711
		XT_SET_NODE_REF(ot->ot_table, ditem, branch);
1712
		ditem += XT_NODE_REF_SIZE;
1713
		size = ditem - ot->ot_ind_wbuf.tb_data;
1714
		XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_NODE_SIZE(size));
1715
		IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(new_branch), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1716
		if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + size, (xtWord1 *) &ot->ot_ind_wbuf))
1717
			goto failed_2;
1718
		ind->mi_root = new_branch;
1719
		goto done_ok;
1720
	}
1721
1722
	current = stack_item->i_branch;
1723
	/* This read does not count (towards ot_ind_reads), because we are only
1724
	 * counting each loaded page once. We assume that the page is in
1725
	 * cache, and will remain in cache when we read again below for the
1726
	 * purpose of update.
1727
	 */
1728
	if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
1729
		goto failed;
1730
	ASSERT_NS(XT_IS_NODE(XT_GET_DISK_2(iref.ir_branch->tb_size_2)));
1731
#ifdef IND_SKEW_SPLIT_ON_APPEND
1732
	result.sr_last_item = last_item;
1733
#endif
1734
	ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, key_value, &result);
1735
1736
	if (result.sr_item.i_total_size + key_value->sv_length + XT_RECORD_REF_SIZE + result.sr_item.i_node_ref_size <= XT_INDEX_PAGE_DATA_SIZE) {
1737
		if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1738
			ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1739
			if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
1740
				goto failed_1;
1741
		}
1742
1743
		if (iref.ir_block->cb_handle_count) {
1744
			if (!xt_ind_copy_on_write(&iref))
1745
				goto failed_1;
1746
		}
1747
1748
		idx_insert_node_item(ot->ot_table, ind, iref.ir_branch, key_value, &result, branch);
1749
		IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1750
#ifdef IND_OPT_DATA_WRITTEN
1751
		iref.ir_block->cb_header = TRUE;
1752
		if (result.sr_item.i_item_offset < iref.ir_block->cb_min_pos)
1753
			iref.ir_block->cb_min_pos = result.sr_item.i_item_offset;
1754
		iref.ir_block->cb_max_pos = result.sr_item.i_total_size;
1755
		ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
1756
	ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
1757
#endif
1758
		iref.ir_updated = TRUE;
1759
		ASSERT_NS(result.sr_item.i_total_size <= XT_INDEX_PAGE_DATA_SIZE);
1760
		xt_ind_release(ot, ind, XT_UNLOCK_R_UPDATE, &iref);
1761
		goto done_ok;
1762
	}
1763
1764
	memcpy(&ot->ot_ind_wbuf, iref.ir_branch, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_total_size);
1765
	idx_insert_node_item(ot->ot_table, ind, &ot->ot_ind_wbuf, key_value, &result, branch);
1766
	IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1767
	ASSERT_NS(result.sr_item.i_total_size > XT_INDEX_PAGE_DATA_SIZE);
1768
#ifdef IND_OPT_DATA_WRITTEN
1769
	new_min_pos = result.sr_item.i_item_offset;
1770
#endif
1771
1772
	/* We assume that value can be overwritten (which is the case) */
1773
	if (!idx_get_middle_branch_item(ot, ind, &ot->ot_ind_wbuf, key_value, &result))
1774
		goto failed_1;
1775
1776
	if (!idx_new_branch(ot, ind, &new_branch))
1777
		goto failed_1;
1778
1779
	/* Split the node: */
1780
	new_size = result.sr_item.i_total_size - result.sr_item.i_item_offset - result.sr_item.i_item_size;
1781
	new_branch_ptr = (XTIdxBranchDPtr) &ot->ot_ind_wbuf.tb_data[XT_INDEX_PAGE_DATA_SIZE];
1782
	memmove(new_branch_ptr->tb_data, &ot->ot_ind_wbuf.tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size], new_size);
1783
1784
	XT_SET_DISK_2(new_branch_ptr->tb_size_2, XT_MAKE_NODE_SIZE(new_size));
1785
	IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(new_branch), (int) XT_GET_DISK_2(new_branch_ptr->tb_size_2));
1786
	if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + new_size, (xtWord1 *) new_branch_ptr))
1787
		goto failed_2;
1788
1789
	/* Change the size of the old branch: */
1790
	XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_NODE_SIZE(result.sr_item.i_item_offset));
1791
	IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
1792
1793
	if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1794
		ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
1795
		if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
1796
			goto failed_2;
1797
	}
1798
1799
	if (iref.ir_block->cb_handle_count) {
1800
		if (!xt_ind_copy_on_write(&iref))
1801
			goto failed_2;
1802
	}
1803
1804
#ifdef IND_OPT_DATA_WRITTEN
1805
	if (result.sr_item.i_item_offset < new_min_pos)
1806
		new_min_pos = result.sr_item.i_item_offset;
1807
#endif
1808
	memcpy(iref.ir_branch, &ot->ot_ind_wbuf, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_item_offset);
1809
#ifdef IND_OPT_DATA_WRITTEN
1810
	iref.ir_block->cb_header = TRUE;
1811
	if (new_min_pos < iref.ir_block->cb_min_pos)
1812
		iref.ir_block->cb_min_pos = new_min_pos;
1813
	iref.ir_block->cb_max_pos = result.sr_item.i_item_offset;
1814
	ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
1815
	ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
1816
#endif
1817
	iref.ir_updated = TRUE;
1818
	xt_ind_release(ot, ind, XT_UNLOCK_R_UPDATE, &iref);
1819
1820
	/* Insert the new branch into the parent node, using the new middle key value: */
1821
	if (!idx_insert_node(ot, ind, stack, last_item, key_value, new_branch)) {
1822
		// Index may be inconsistant now...
1823
		idx_free_branch(ot, ind, new_branch);
1824
		goto failed;
1825
	}
1826
1827
	done_ok:
1828
	return OK;
1829
1830
	failed_2:
1831
	idx_free_branch(ot, ind, new_branch);
1832
1833
	failed_1:
1834
	xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
1835
1836
	failed:
1837
	return FAILED;
1838
}
1839
1840
#define IDX_MAX_INDEX_FLUSH_COUNT		10
1841
1842
struct IdxTableItem {
1843
	xtTableID		ti_tab_id;
1844
	u_int			ti_dirty_blocks;
1845
};
1846
1847
inline u_int idx_dirty_blocks(XTTableHPtr tab)
1848
{
1849
	XTIndexPtr	*indp;
1850
	XTIndexPtr	ind;
1851
	u_int		dirty_blocks;
1852
1853
	dirty_blocks = 0;
1854
	indp = tab->tab_dic.dic_keys;
1515.1.2 by Paul McCullagh
Fixed further: comparison between signed and unsigned integer expressions, errors
1855
	for (u_int i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
1455.3.1 by Vladimir Kolesnikov
lp:drizzle + pbxt 1.1 + test results
1856
		ind = *indp;
1857
		dirty_blocks += ind->mi_dirty_blocks;
1858
	}
1859
	return dirty_blocks;
1860
}
1861
1862
static xtBool idx_out_of_memory_failure(XTOpenTablePtr ot)
1863
{
1864
#ifdef XT_TRACK_INDEX_UPDATES
1865
	/* If the index has been changed when we run out of memory, we
1866
	 * will corrupt the index!
1867
	 */
1868
	ASSERT_NS(ot->ot_ind_changed == 0);
1869
#endif
1870
	if (ot->ot_thread->t_exception.e_xt_err == XT_ERR_NO_INDEX_CACHE) {
1871
		u_int block_total = xt_ind_get_blocks();
1872
1873
		/* Flush index and retry! */
1874
		xt_clear_exception(ot->ot_thread);
1875
1876
		if (idx_dirty_blocks(ot->ot_table) >= block_total / 4) {
1877
			if (!xt_async_flush_indices(ot->ot_table, FALSE, TRUE, ot->ot_thread))
1878
				return FAILED;
1879
			if (!xt_wait_for_async_task_results(ot->ot_thread))
1880
				return FAILED;
1881
		}
1882
		else {
1883
			XTDatabaseHPtr	db = ot->ot_table->tab_db;
1884
			IdxTableItem	table_list[IDX_MAX_INDEX_FLUSH_COUNT];
1885
			int				item_count = 0;
1886
			int				i;
1887
			u_int			edx;
1888
			XTTableEntryPtr	tab_ptr;
1889
			u_int			dirty_blocks;
1890
			u_int			dirty_total = 0;
1891
1892
			xt_ht_lock(NULL, db->db_tables);
1893
			xt_enum_tables_init(&edx);
1894
			while ((tab_ptr = xt_enum_tables_next(NULL, db, &edx))) {
1895
				if (tab_ptr->te_table) {
1896
					if (tab_ptr->te_table->tab_ind_flush_task->tk_is_running()) {
1897
						if (!(dirty_blocks = tab_ptr->te_table->tab_ind_flush_task->fit_dirty_blocks))
1898
							dirty_blocks = idx_dirty_blocks(tab_ptr->te_table);
1899
					}
1900
					else
1901
						dirty_blocks = idx_dirty_blocks(tab_ptr->te_table);
1902
					dirty_total += dirty_blocks;
1903
					if (dirty_blocks) {
1904
						for (i=0; i<item_count; i++) {
1905
							if (table_list[i].ti_dirty_blocks < dirty_blocks)
1906
								break;
1907
						}
1908
						if (i < IDX_MAX_INDEX_FLUSH_COUNT) {
1909
							int cnt;
1910
							
1911
							if (item_count < IDX_MAX_INDEX_FLUSH_COUNT) {
1912
								cnt = item_count - i;
1913
								item_count++;
1914
							}
1915
							else
1916
								cnt = item_count - i - 1;
1917
							memmove(&table_list[i], &table_list[i+1], sizeof(IdxTableItem) * cnt);
1918
							table_list[i].ti_tab_id = tab_ptr->te_table->tab_id;
1919
							table_list[i].ti_dirty_blocks = dirty_blocks;
1920
						}
1921
					}
1922
					if (dirty_total >= block_total / 4)
1923
						break;
1924
				}
1925
			}
1926
			xt_ht_unlock(NULL, db->db_tables);
1927
			if (dirty_total >= block_total / 4) {
1928
				for (i=0; i<item_count; i++) {
1929
					if (table_list[i].ti_tab_id == ot->ot_table->tab_id) {
1930
						if (!xt_async_flush_indices(ot->ot_table, FALSE, TRUE, ot->ot_thread))
1931
							return FAILED;
1932
					}
1933
					else {
1934
						XTTableHPtr tab;
1935
						xtBool		ok;
1936
1937
						if ((tab = xt_use_table_by_id_ns(db, table_list[i].ti_tab_id))) {
1938
							ok = xt_async_flush_indices(tab, FALSE, TRUE, ot->ot_thread);
1939
							xt_heap_release_ns(tab);
1940
						}
1941
					}
1942
				}
1943
				if (!xt_wait_for_async_task_results(ot->ot_thread))
1944
					return FAILED;
1945
			}
1946
		}
1947
1948
		return TRUE;
1949
	}
1950
	return FALSE;
1951
}
1952
1953
/*
1954
 * Check all the duplicate variation in an index.
1955
 * If one of them is visible, then we have a duplicate key
1956
 * error.
1957
 *
1958
 * GOTCHA: This routine must use the write index buffer!
1959
 */
1960
static xtBool idx_check_duplicates(XTOpenTablePtr ot, XTIndexPtr ind, XTIdxKeyValuePtr key_value)
1961
{
1962
	IdxBranchStackRec	stack;
1963
	xtIndexNodeID		current;
1964
	XTIndReferenceRec	iref;
1965
	XTIdxResultRec		result;
1966
	xtBool				on_key = FALSE;
1967
	xtXactID			xn_id;
1968
	int					save_flags;				
1969
	XTXactWaitRec		xw;
1970
1971
#ifdef DEBUG
1972
	iref.ir_xlock = 2;
1973
	iref.ir_updated = 2;
1974
#endif
1975
	retry:
1976
	idx_newstack(&stack);
1977
1978
	if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
1979
		return OK;
1980
1981
	save_flags = key_value->sv_flags;
1982
	key_value->sv_flags = 0;
1983
1984
	while (XT_NODE_ID(current)) {
1985
		if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref)) {
1986
			key_value->sv_flags = save_flags;
1987
			return FAILED;
1988
		}
1989
		ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, key_value, &result);
1990
		if (result.sr_found)
1991
			/* If we have found the key in a node: */
1992
			on_key = TRUE;
1993
		if (!result.sr_item.i_node_ref_size)
1994
			break;
1995
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
1996
		if (!idx_push(&stack, current, &result.sr_item)) {
1997
			key_value->sv_flags = save_flags;
1998
			return FAILED;
1999
		}
2000
		current = result.sr_branch;
2001
	}
2002
2003
	key_value->sv_flags = save_flags;
2004
2005
	if (!on_key) {
2006
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2007
		return OK;
2008
	}
2009
2010
	for (;;) {
2011
		if (result.sr_item.i_item_offset == result.sr_item.i_total_size) {
2012
			IdxStackItemPtr node;
2013
2014
			/* We are at the end of a leaf node.
2015
			 * Go up the stack to find the start position of the next key.
2016
			 * If we find none, then we are the end of the index.
2017
			 */
2018
			xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2019
			while ((node = idx_pop(&stack))) {
2020
				if (node->i_pos.i_item_offset < node->i_pos.i_total_size) {
2021
					current = node->i_branch;
2022
					if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
2023
						return FAILED;
2024
					xt_get_res_record_ref(&iref.ir_branch->tb_data[node->i_pos.i_item_offset + node->i_pos.i_item_size - XT_RECORD_REF_SIZE], &result);
2025
					result.sr_item = node->i_pos;
2026
					goto check_value;
2027
				}
2028
			}
2029
			break;
2030
		}
2031
2032
		check_value:
2033
		/* Quit the loop if the key is no longer matched! */
2034
		if (myxt_compare_key(ind, 0, key_value->sv_length, key_value->sv_key, &iref.ir_branch->tb_data[result.sr_item.i_item_offset]) != 0) {
2035
			xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2036
			break;
2037
		}
2038
2039
		if (ind->mi_lazy_delete) {
2040
			if (result.sr_row_id == (xtRowID) -1)
2041
				goto next_item;
2042
		}
2043
2044
		switch (xt_tab_maybe_committed(ot, result.sr_rec_id, &xn_id, NULL, NULL)) {
2045
			case XT_MAYBE:
2046
				/* Record is not committed, wait for the transaction. */
2047
				xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2048
				XT_INDEX_UNLOCK(ind, ot);				
2049
				xw.xw_xn_id = xn_id;
2050
				if (!xt_xn_wait_for_xact(ot->ot_thread, &xw, NULL)) {
2051
					XT_INDEX_WRITE_LOCK(ind, ot);
2052
					return FAILED;
2053
				}
2054
				XT_INDEX_WRITE_LOCK(ind, ot);
2055
				goto retry;			
2056
			case XT_ERR:
2057
				/* Error while reading... */
2058
				goto failed;
2059
			case TRUE:
2060
				/* Record is committed or belongs to me, duplicate key: */
2061
				XT_DEBUG_TRACE(("DUPLICATE KEY tx=%d rec=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_xn_id, (int) result.sr_rec_id));
2062
				xt_register_xterr(XT_REG_CONTEXT, XT_ERR_DUPLICATE_KEY);
2063
				goto failed;
2064
			case FALSE:
2065
				/* Record is deleted or rolled-back: */
2066
				break;
2067
		}
2068
2069
		next_item:
2070
		idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
2071
2072
		if (result.sr_item.i_node_ref_size) {
2073
			/* Go down to the bottom: */
2074
			while (XT_NODE_ID(current)) {
2075
				xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2076
				if (!idx_push(&stack, current, &result.sr_item))
2077
					return FAILED;
2078
				current = result.sr_branch;
2079
				if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
2080
					return FAILED;
2081
				idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
2082
				if (!result.sr_item.i_node_ref_size)
2083
					break;
2084
			}
2085
		}
2086
	}
2087
2088
	return OK;
2089
	
2090
	failed:
2091
	xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2092
	return FAILED;
2093
}
2094
2095
inline static void idx_still_on_key(XTIndexPtr ind, register XTIdxSearchKeyPtr search_key, register XTIdxBranchDPtr branch, register XTIdxItemPtr item)
2096
{
2097
	if (search_key && search_key->sk_on_key) {
2098
		search_key->sk_on_key = myxt_compare_key(ind, search_key->sk_key_value.sv_flags, search_key->sk_key_value.sv_length,
2099
			search_key->sk_key_value.sv_key, &branch->tb_data[item->i_item_offset]) == 0;
2100
	}
2101
}
2102
2103
/*
2104
 * Insert a value into the given index. Return FALSE if an error occurs.
2105
 */
2106
xtPublic xtBool xt_idx_insert(XTOpenTablePtr ot, XTIndexPtr ind, xtRowID row_id, xtRecordID rec_id, xtWord1 *rec_buf, xtWord1 *bef_buf, xtBool allow_dups)
2107
{
2108
	XTIdxKeyValueRec	key_value;
2109
	xtWord1				key_buf[XT_INDEX_MAX_KEY_SIZE];
2110
	IdxBranchStackRec	stack;
2111
	xtIndexNodeID		current;
2112
	XTIndReferenceRec	iref;
2113
	xtIndexNodeID		new_branch;
2114
	XTIdxBranchDPtr		new_branch_ptr;
2115
	size_t				size;
2116
	XTIdxResultRec		result;
2117
	size_t				new_size;
2118
	xtBool				check_for_dups = ind->mi_flags & (HA_UNIQUE_CHECK | HA_NOSAME) && !allow_dups;
2119
	xtBool				lock_structure = FALSE;
2120
	xtBool				updated = FALSE;
2121
#ifdef IND_OPT_DATA_WRITTEN
2122
	u_int				new_min_pos;
2123
#endif
2124
2125
#ifdef DEBUG
2126
	iref.ir_xlock = 2;
2127
	iref.ir_updated = 2;
2128
#endif
2129
#ifdef CHECK_AND_PRINT
2130
	//idx_check_index(ot, ind, TRUE);
2131
#endif
2132
2133
	retry_after_oom:
2134
#ifdef XT_TRACK_INDEX_UPDATES
2135
	ot->ot_ind_changed = 0;
2136
#endif
2137
	key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
2138
	key_value.sv_rec_id = rec_id;
2139
	key_value.sv_row_id = row_id;		/* Should always be zero on insert (will be update by sweeper later). 
2140
										 * Non-zero only during recovery, assuming that sweeper will process such records right after recovery.
2141
										 */
2142
	key_value.sv_key = key_buf;
2143
	key_value.sv_length = myxt_create_key_from_row(ind, key_buf, rec_buf, &check_for_dups);
2144
2145
	if (bef_buf && check_for_dups) {
2146
		/* If we have a before image, and we are required to check for duplicates.
2147
		 * then compare the before image key with the after image key.
2148
		 */
2149
		xtWord1	bef_key_buf[XT_INDEX_MAX_KEY_SIZE];
2150
		u_int	len;
2151
		xtBool	has_no_null = TRUE;
2152
2153
		len = myxt_create_key_from_row(ind, bef_key_buf, bef_buf, &has_no_null);
2154
		if (has_no_null) {
2155
			/* If the before key has no null values, then compare with the after key value.
2156
			 * We only have to check for duplicates if the key has changed!
2157
			 */
2158
			check_for_dups = myxt_compare_key(ind, 0, len, bef_key_buf, key_buf) != 0;
2159
		}
2160
	}
2161
2162
	/* The index appears to have no root: */
2163
	if (!XT_NODE_ID(ind->mi_root))
2164
		lock_structure = TRUE;
2165
2166
	lock_and_retry:
2167
	idx_newstack(&stack);
2168
2169
	/* A write lock is only required if we are going to change the
2170
	 * strcuture of the index!
2171
	 */
2172
	if (lock_structure)
2173
		XT_INDEX_WRITE_LOCK(ind, ot);
2174
	else
2175
		XT_INDEX_READ_LOCK(ind, ot);
2176
2177
	retry:
2178
	/* Create a root node if required: */
2179
	if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root))) {
2180
		/* Index is empty, create a new one: */
2181
		ASSERT_NS(lock_structure);
2182
		if (!xt_ind_reserve(ot, 1, NULL))
2183
			goto failed;
2184
		if (!idx_new_branch(ot, ind, &new_branch))
2185
			goto failed;
2186
		size = idx_write_branch_item(ind, ot->ot_ind_wbuf.tb_data, &key_value);
2187
		XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_LEAF_SIZE(size));
2188
		IDX_TRACE("%d-> %x\n", (int) new_branch, (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
2189
		if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + size, (xtWord1 *) &ot->ot_ind_wbuf))
2190
			goto failed_2;
2191
		ind->mi_root = new_branch;
2192
		goto done_ok;
2193
	}
2194
2195
	/* Search down the tree for the insertion point. */
2196
#ifdef IND_SKEW_SPLIT_ON_APPEND
2197
	result.sr_last_item = TRUE;
2198
#endif
2199
	while (XT_NODE_ID(current)) {
2200
		if (!xt_ind_fetch(ot, ind, current, XT_XLOCK_LEAF, &iref))
2201
			goto failed;
2202
		ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &key_value, &result);
2203
		if (result.sr_duplicate) {
2204
			if (check_for_dups) {
2205
				/* Duplicates are not allowed, at least one has been
2206
				 * found...
2207
				 */
2208
2209
				/* Leaf nodes (i_node_ref_size == 0) are write locked,
2210
				 * non-leaf nodes are read locked.
2211
				 */
2212
				xt_ind_release(ot, ind, result.sr_item.i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, &iref);
2213
2214
				if (!idx_check_duplicates(ot, ind, &key_value))
2215
					goto failed;
2216
				/* We have checked all the "duplicate" variations. None of them are
2217
				 * relevant. So this will cause a correct insert.
2218
				 */
2219
				check_for_dups = FALSE;
2220
				idx_newstack(&stack);
2221
				goto retry;
2222
			}
2223
		}
2224
		if (result.sr_found) {
2225
			/* Node found, can happen during recovery of indexes! 
2226
			 * We have found an exact match of both key and record.
2227
			 */
2228
			XTPageUnlockType	utype;
2229
			xtBool				overwrite = FALSE;
2230
2231
			/* {LAZY-DEL-INDEX-ITEMS}
2232
			 * If the item has been lazy deleted, then just overwrite!
2233
			 */ 
2234
			if (result.sr_row_id == (xtRowID) -1) {
2235
				xtWord2 del_count;
2236
	
2237
				/* This is safe because we have an xlock on the leaf. */
2238
				if ((del_count = iref.ir_block->cp_del_count))
2239
					iref.ir_block->cp_del_count = del_count-1;
2240
				overwrite = TRUE;
2241
			}
2242
2243
			if (!result.sr_row_id && row_id) {
2244
				/* {INDEX-RECOV_ROWID} Set the row-id
2245
				 * during recovery, even if the index entry
2246
				 * is not committed.
2247
				 * It will be removed later by the sweeper.
2248
				 */
2249
				overwrite = TRUE;
2250
			}
2251
2252
			if (overwrite) {
2253
				if (!idx_set_item_row_id(ot, ind, &iref, &result.sr_item, row_id))
2254
					goto failed;
2255
				utype = result.sr_item.i_node_ref_size ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_W_UPDATE;
2256
			}
2257
			else
2258
				utype = result.sr_item.i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE;
2259
			xt_ind_release(ot, ind, utype, &iref);
2260
			goto done_ok;
2261
		}
2262
		/* Stop when we get to a leaf: */
2263
		if (!result.sr_item.i_node_ref_size)
2264
			break;
2265
		xt_ind_release(ot, ind, result.sr_item.i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, &iref);
2266
		if (!idx_push(&stack, current, NULL))
2267
			goto failed;
2268
		current = result.sr_branch;
2269
	}
2270
	ASSERT_NS(XT_NODE_ID(current));
2271
	
2272
	/* Must be a leaf!: */
2273
	ASSERT_NS(!result.sr_item.i_node_ref_size);
2274
2275
	updated = FALSE;
2276
	if (ind->mi_lazy_delete && iref.ir_block->cp_del_count) {
2277
		/* There are a number of possibilities:
2278
		 * - We could just replace a lazy deleted slot.
2279
		 * - We could compact and insert.
2280
		 * - We could just insert
2281
		 */
2282
2283
		if (result.sr_item.i_item_offset > 0) {
2284
			/* Check if it can go into the previous node: */
2285
			XTIdxResultRec	t_res;
2286
2287
			t_res.sr_item = result.sr_item;
2288
			xt_prev_branch_item_fix(ot->ot_table, ind, iref.ir_branch, &t_res);
2289
			if (t_res.sr_row_id != (xtRowID) -1)
2290
				goto try_current;
2291
2292
			/* Yup, it can, but first check to see if it would be 
2293
			 * better to put it in the current node.
2294
			 * This is the case if the previous node key is not the
2295
			 * same as the key we are adding...
2296
			 */
2297
			if (result.sr_item.i_item_offset < result.sr_item.i_total_size &&
2298
				result.sr_row_id == (xtRowID) -1) {
2299
				if (!idx_cmp_item_key_fix(&iref, &t_res.sr_item, &key_value))
2300
					goto try_current;
2301
			}
2302
2303
			idx_set_item_key_fix(&iref, &t_res.sr_item, &key_value);
2304
			iref.ir_block->cp_del_count--;
2305
			xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
2306
			goto done_ok;
2307
		}
2308
2309
		try_current:
2310
		if (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
2311
			if (result.sr_row_id == (xtRowID) -1) {
2312
				idx_set_item_key_fix(&iref, &result.sr_item, &key_value);
2313
				iref.ir_block->cp_del_count--;
2314
				xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
2315
				goto done_ok;
2316
			}
2317
		}
2318
2319
		/* Check if we must compact... 
2320
		 * It makes no sense to split as long as there are lazy deleted items
2321
		 * in the page. So, delete them if a split would otherwise be required!
2322
		 */
2323
		ASSERT_NS(key_value.sv_length + XT_RECORD_REF_SIZE == result.sr_item.i_item_size);
2324
		if (result.sr_item.i_total_size + key_value.sv_length + XT_RECORD_REF_SIZE > XT_INDEX_PAGE_DATA_SIZE) {
2325
			if (!idx_compact_leaf(ot, ind, &iref, &result.sr_item))
2326
				goto failed;
2327
			updated = TRUE;
2328
		}
2329
		
2330
		/* Fall through to the insert code... */
2331
		/* NOTE: if there were no lazy deleted items in the leaf, then
2332
		 * idx_compact_leaf is a NOP. This is the only case in which it may not
2333
		 * fall through and do the insert below.
2334
		 *
2335
		 * Normally, if the cp_del_count is correct then the insert
2336
		 * will work below, and the assertion here will not fail.
2337
		 *
2338
		 * In this case, the xt_ind_release() will correctly indicate an update.
2339
		 */
2340
		ASSERT_NS(result.sr_item.i_total_size + key_value.sv_length + XT_RECORD_REF_SIZE <= XT_INDEX_PAGE_DATA_SIZE);
2341
	}
2342
2343
	if (result.sr_item.i_total_size + key_value.sv_length + XT_RECORD_REF_SIZE <= XT_INDEX_PAGE_DATA_SIZE) {
2344
		if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
2345
			ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
2346
			if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
2347
				goto failed_1;
2348
		}
2349
2350
		if (iref.ir_block->cb_handle_count) {
2351
			if (!xt_ind_copy_on_write(&iref))
2352
				goto failed_1;
2353
		}
2354
2355
		idx_insert_leaf_item(ind, iref.ir_branch, &key_value, &result);
2356
		IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
2357
		ASSERT_NS(result.sr_item.i_total_size <= XT_INDEX_PAGE_DATA_SIZE);
2358
#ifdef IND_OPT_DATA_WRITTEN
2359
		iref.ir_block->cb_header = TRUE;
2360
		if (result.sr_item.i_item_offset < iref.ir_block->cb_min_pos)
2361
			iref.ir_block->cb_min_pos = result.sr_item.i_item_offset;
2362
		iref.ir_block->cb_max_pos = result.sr_item.i_total_size;
2363
		ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
2364
	ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
2365
#endif
2366
		iref.ir_updated = TRUE;
2367
		xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
2368
		goto done_ok;
2369
	}
2370
2371
	/* Key does not fit. Must split the node.
2372
	 * Make sure we have a structural lock:
2373
	 */
2374
	if (!lock_structure) {
2375
		xt_ind_release(ot, ind, updated ? XT_UNLOCK_W_UPDATE : XT_UNLOCK_WRITE, &iref);
2376
		XT_INDEX_UNLOCK(ind, ot);
2377
		lock_structure = TRUE;
2378
		goto lock_and_retry;
2379
	}
2380
2381
	memcpy(&ot->ot_ind_wbuf, iref.ir_branch, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_total_size);
2382
	idx_insert_leaf_item(ind, &ot->ot_ind_wbuf, &key_value, &result);
2383
	IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
2384
	ASSERT_NS(result.sr_item.i_total_size > XT_INDEX_PAGE_DATA_SIZE && result.sr_item.i_total_size <= XT_INDEX_PAGE_DATA_SIZE*2);
2385
#ifdef IND_OPT_DATA_WRITTEN
2386
	new_min_pos = result.sr_item.i_item_offset;
2387
#endif
2388
2389
	/* This is the number of potential writes. In other words, the total number
2390
	 * of blocks that may be accessed.
2391
	 *
2392
	 * Note that this assume if a block is read and written soon after that the block
2393
	 * will not be freed in-between (a safe assumption?)
2394
	 */
2395
	if (!xt_ind_reserve(ot, stack.s_top * 2 + 3, iref.ir_branch))
2396
		goto failed_1;
2397
2398
	/* Key does not fit, must split... */
2399
	if (!idx_get_middle_branch_item(ot, ind, &ot->ot_ind_wbuf, &key_value, &result))
2400
		goto failed_1;
2401
2402
	if (!idx_new_branch(ot, ind, &new_branch))
2403
		goto failed_1;
2404
2405
	/* Copy and write the rest of the data to the new node: */
2406
	new_size = result.sr_item.i_total_size - result.sr_item.i_item_offset - result.sr_item.i_item_size;
2407
	new_branch_ptr = (XTIdxBranchDPtr) &ot->ot_ind_wbuf.tb_data[XT_INDEX_PAGE_DATA_SIZE];
2408
	memmove(new_branch_ptr->tb_data, &ot->ot_ind_wbuf.tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size], new_size);
2409
2410
	XT_SET_DISK_2(new_branch_ptr->tb_size_2, XT_MAKE_LEAF_SIZE(new_size));
2411
	IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(new_branch), (int) XT_GET_DISK_2(new_branch_ptr->tb_size_2));
2412
	if (!xt_ind_write(ot, ind, new_branch, offsetof(XTIdxBranchDRec, tb_data) + new_size, (xtWord1 *) new_branch_ptr))
2413
		goto failed_2;
2414
2415
	/* Modify the first node: */
2416
	XT_SET_DISK_2(ot->ot_ind_wbuf.tb_size_2, XT_MAKE_LEAF_SIZE(result.sr_item.i_item_offset));
2417
	IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
2418
2419
	if (iref.ir_block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
2420
		ASSERT_NS(ot->ot_table->tab_ind_flush_ilog);
2421
		if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, iref.ir_block))
2422
			goto failed_2;
2423
	}
2424
2425
	if (iref.ir_block->cb_handle_count) {
2426
		if (!xt_ind_copy_on_write(&iref))
2427
			goto failed_2;
2428
	}
2429
#ifdef IND_OPT_DATA_WRITTEN
2430
	if (result.sr_item.i_item_offset < new_min_pos)
2431
		new_min_pos = result.sr_item.i_item_offset;
2432
#endif
2433
	memcpy(iref.ir_branch, &ot->ot_ind_wbuf, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_item_offset);
2434
#ifdef IND_OPT_DATA_WRITTEN
2435
	iref.ir_block->cb_header = TRUE;
2436
	if (new_min_pos < iref.ir_block->cb_min_pos)
2437
		iref.ir_block->cb_min_pos = new_min_pos;
2438
	iref.ir_block->cb_max_pos = result.sr_item.i_item_offset;
2439
	ASSERT_NS(iref.ir_block->cb_max_pos <= XT_INDEX_PAGE_DATA_SIZE);
2440
	ASSERT_NS(iref.ir_block->cb_min_pos <= iref.ir_block->cb_max_pos);
2441
#endif
2442
	iref.ir_updated = TRUE;
2443
	xt_ind_release(ot, ind, XT_UNLOCK_W_UPDATE, &iref);
2444
2445
	/* Insert the new branch into the parent node, using the new middle key value: */
2446
	if (!idx_insert_node(ot, ind, &stack, result.sr_last_item, &key_value, new_branch)) {
2447
		// Index may be inconsistant now...
2448
		idx_free_branch(ot, ind, new_branch);
2449
		goto failed;
2450
	}
2451
2452
#ifdef XT_TRACK_INDEX_UPDATES
2453
	ASSERT_NS(ot->ot_ind_reserved >= ot->ot_ind_reads);
2454
#endif
2455
2456
	done_ok:
2457
	XT_INDEX_UNLOCK(ind, ot);
2458
2459
#ifdef DEBUG
2460
	//printf("INSERT OK\n");
2461
	//idx_check_index(ot, ind, TRUE);
2462
#endif
2463
	xt_ind_unreserve(ot);
2464
	return OK;
2465
2466
	failed_2:
2467
	idx_free_branch(ot, ind, new_branch);
2468
2469
	failed_1:
2470
	xt_ind_release(ot, ind, updated ? XT_UNLOCK_W_UPDATE : XT_UNLOCK_WRITE, &iref);
2471
2472
	failed:
2473
	XT_INDEX_UNLOCK(ind, ot);
2474
	if (idx_out_of_memory_failure(ot))
2475
		goto retry_after_oom;
2476
2477
#ifdef DEBUG
2478
	//printf("INSERT FAILED\n");
2479
	//idx_check_index(ot, ind, TRUE);
2480
#endif
2481
	xt_ind_unreserve(ot);
2482
	return FAILED;
2483
}
2484
2485
2486
/* Remove the given item in the node.
2487
 * This is done by going down the tree to find a replacement
2488
 * for the deleted item!
2489
 */
2490
static xtBool idx_remove_item_in_node(XTOpenTablePtr ot, XTIndexPtr ind, IdxBranchStackPtr stack, XTIndReferencePtr iref, XTIdxKeyValuePtr key_value)
2491
{
2492
	IdxStackItemPtr		delete_node;
2493
	XTIdxResultRec		result;
2494
	xtIndexNodeID		current;
2495
	xtBool				lazy_delete_cleanup_required = FALSE;
2496
	IdxStackItemPtr		current_top;
2497
2498
	delete_node = idx_top(stack);
2499
	current = delete_node->i_branch;
2500
	result.sr_item = delete_node->i_pos;
2501
2502
	/* Follow the branch after this item: */
2503
	idx_next_branch_item(ot->ot_table, ind, iref->ir_branch, &result);
2504
	xt_ind_release(ot, ind, iref->ir_updated ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_READ, iref);
2505
2506
	/* Go down the left-hand side until we reach a leaf: */
2507
	while (XT_NODE_ID(current)) {
2508
		current = result.sr_branch;
2509
		if (!xt_ind_fetch(ot, ind, current, XT_XLOCK_LEAF, iref))
2510
			return FAILED;
2511
		idx_first_branch_item(ot->ot_table, ind, iref->ir_branch, &result);
2512
		if (!result.sr_item.i_node_ref_size)
2513
			break;
2514
		xt_ind_release(ot, ind, XT_UNLOCK_READ, iref);
2515
		if (!idx_push(stack, current, &result.sr_item))
2516
			return FAILED;
2517
	}
2518
2519
	ASSERT_NS(XT_NODE_ID(current));
2520
	ASSERT_NS(!result.sr_item.i_node_ref_size);
2521
2522
	if (!xt_ind_reserve(ot, stack->s_top + 2, iref->ir_branch)) {
2523
		xt_ind_release(ot, ind, XT_UNLOCK_WRITE, iref);
2524
		return FAILED;
2525
	}
2526
	
2527
	/* This code removes lazy deleted items from the leaf,
2528
	 * before we promote an item to a leaf.
2529
	 * This is not essential, but prevents lazy deleted
2530
	 * items from being propogated up the tree.
2531
	 */
2532
	if (ind->mi_lazy_delete) {
2533
		if (iref->ir_block->cp_del_count) {
2534
			if (!idx_compact_leaf(ot, ind, iref, &result.sr_item))
2535
				return FAILED;
2536
		}
2537
	}
2538
2539
	/* Crawl back up the stack trace, looking for a key
2540
	 * that can be used to replace the deleted key.
2541
	 *
2542
	 * Any empty nodes on the way up can be removed!
2543
	 */
2544
	if (result.sr_item.i_total_size > 0) {
2545
		/* There is a key in the leaf, extract it, and put it in the node: */
2546
		memcpy(key_value->sv_key, &iref->ir_branch->tb_data[result.sr_item.i_item_offset], result.sr_item.i_item_size);
2547
		/* This call also frees the iref.ir_branch page! */
2548
		if (!idx_remove_branch_item_right(ot, ind, current, iref, &result.sr_item))
2549
			return FAILED;
2550
		if (!idx_replace_node_key(ot, ind, delete_node, stack, result.sr_item.i_item_size, key_value->sv_key))
2551
			return FAILED;
2552
		goto done_ok;
2553
	}
2554
2555
	xt_ind_release(ot, ind, iref->ir_updated ? XT_UNLOCK_W_UPDATE : XT_UNLOCK_WRITE, iref);
2556
2557
	for (;;) {
2558
		/* The current node/leaf is empty, remove it: */
2559
		idx_free_branch(ot, ind, current);
2560
2561
		current_top = idx_pop(stack);
2562
		current = current_top->i_branch;
2563
		if (!xt_ind_fetch(ot, ind, current, XT_XLOCK_LEAF, iref))
2564
			return FAILED;
2565
		
2566
		if (current_top == delete_node) {
2567
			/* All children have been removed. Delete the key and done: */
2568
			if (!idx_remove_branch_item_right(ot, ind, current, iref, &current_top->i_pos))
2569
				return FAILED;
2570
			goto done_ok;
2571
		}
2572
2573
		if (current_top->i_pos.i_total_size > current_top->i_pos.i_node_ref_size) {
2574
			/* Save the key: */
2575
			memcpy(key_value->sv_key, &iref->ir_branch->tb_data[current_top->i_pos.i_item_offset], current_top->i_pos.i_item_size);
2576
			/* This function also frees the cache page: */
2577
			if (!idx_remove_branch_item_left(ot, ind, current, iref, &current_top->i_pos, &lazy_delete_cleanup_required))
2578
				return FAILED;
2579
			if (!idx_replace_node_key(ot, ind, delete_node, stack, current_top->i_pos.i_item_size, key_value->sv_key))
2580
				return FAILED;
2581
			/* */
2582
			if (lazy_delete_cleanup_required) {
2583
				if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, iref))
2584
					return FAILED;
2585
				if (!idx_remove_lazy_deleted_item_in_node(ot, ind, current, iref, key_value))
2586
					return FAILED;
2587
			}
2588
			goto done_ok;
2589
		}
2590
		xt_ind_release(ot, ind, current_top->i_pos.i_node_ref_size ? XT_UNLOCK_READ : XT_UNLOCK_WRITE, iref);
2591
	}
2592
2593
	done_ok:
2594
#ifdef XT_TRACK_INDEX_UPDATES
2595
	ASSERT_NS(ot->ot_ind_reserved >= ot->ot_ind_reads);
2596
#endif
2597
	return OK;
2598
}
2599
2600
/*
2601
 * This function assumes we have a lock on the structure of the index.
2602
 */
2603
static xtBool idx_remove_lazy_deleted_item_in_node(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID current, XTIndReferencePtr iref, XTIdxKeyValuePtr key_value)
2604
{
2605
	IdxBranchStackRec	stack;
2606
	XTIdxResultRec		result;
2607
2608
	/* Now remove all lazy deleted items in this node.... */
2609
	idx_first_branch_item(ot->ot_table, ind, (XTIdxBranchDPtr) iref->ir_block->cb_data, &result);
2610
2611
	for (;;) {
2612
		while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
2613
			if (result.sr_row_id == (xtRowID) -1)
2614
				goto remove_item;
2615
			idx_next_branch_item(ot->ot_table, ind, (XTIdxBranchDPtr) iref->ir_block->cb_data, &result);
2616
		}
2617
		break;
2618
2619
		remove_item:
2620
2621
		idx_newstack(&stack);
2622
		if (!idx_push(&stack, current, &result.sr_item)) {
2623
			xt_ind_release(ot, ind, iref->ir_updated ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_READ, iref);
2624
			return FAILED;
2625
		}
2626
2627
		if (!idx_remove_item_in_node(ot, ind, &stack, iref, key_value))
2628
			return FAILED;
2629
2630
		/* Go back up to the node we are trying to
2631
		 * free of things.
2632
		 */
2633
		if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, iref))
2634
			return FAILED;
2635
		/* Load the data again: */
2636
		idx_reload_item_fix(ind, iref->ir_branch, &result);
2637
	}
2638
2639
	xt_ind_release(ot, ind, iref->ir_updated ? XT_UNLOCK_R_UPDATE : XT_UNLOCK_READ, iref);
2640
	return OK;
2641
}
2642
2643
static xtBool idx_delete(XTOpenTablePtr ot, XTIndexPtr ind, XTIdxKeyValuePtr key_value)
2644
{
2645
	IdxBranchStackRec	stack;
2646
	xtIndexNodeID		current;
2647
	XTIndReferenceRec	iref;
2648
	XTIdxResultRec		result;
2649
	xtBool				lock_structure = FALSE;
2650
2651
#ifdef DEBUG
2652
	iref.ir_xlock = 2;
2653
	iref.ir_updated = 2;
2654
#endif
2655
	/* The index appears to have no root: */
2656
	if (!XT_NODE_ID(ind->mi_root))
2657
		lock_structure = TRUE;
2658
2659
	lock_and_retry:
2660
	idx_newstack(&stack);
2661
2662
	if (lock_structure)
2663
		XT_INDEX_WRITE_LOCK(ind, ot);
2664
	else
2665
		XT_INDEX_READ_LOCK(ind, ot);
2666
2667
	if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
2668
		goto done_ok;
2669
2670
	while (XT_NODE_ID(current)) {
2671
		if (!xt_ind_fetch(ot, ind, current, XT_XLOCK_DEL_LEAF, &iref))
2672
			goto failed;
2673
		ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, key_value, &result);
2674
		if (!result.sr_item.i_node_ref_size) {
2675
			/* A leaf... */
2676
			if (result.sr_found) {
2677
				if (ind->mi_lazy_delete) {
2678
					/* If the we have a W lock, then fetch decided that we
2679
					 * need to compact the page.
2680
					 * The decision is made by xt_idx_lazy_delete_on_leaf() 
2681
					 */
2682
					if (!iref.ir_xlock) {
2683
						if (!idx_lazy_delete_branch_item(ot, ind, &iref, &result.sr_item))
2684
							goto failed;
2685
					}
2686
					else {
2687
						if (!iref.ir_block->cp_del_count) {
2688
							if (!idx_remove_branch_item_right(ot, ind, current, &iref, &result.sr_item))
2689
								goto failed;
2690
						}
2691
						else {
2692
							if (!idx_lazy_remove_leaf_item_right(ot, ind, &iref, &result.sr_item))
2693
								goto failed;
2694
						}
2695
					}
2696
				}
2697
				else {
2698
					if (!idx_remove_branch_item_right(ot, ind, current, &iref, &result.sr_item))
2699
						goto failed;
2700
				}
2701
			}
2702
			else
2703
				xt_ind_release(ot, ind, iref.ir_xlock ? XT_UNLOCK_WRITE : XT_UNLOCK_READ, &iref);
2704
			goto done_ok;
2705
		}
2706
		if (!idx_push(&stack, current, &result.sr_item)) {
2707
			xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2708
			goto failed;
2709
		}
2710
		if (result.sr_found)
2711
			/* If we have found the key in a node: */
2712
			break;
2713
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2714
		current = result.sr_branch;
2715
	}
2716
2717
	/* Must be a non-leaf!: */
2718
	ASSERT_NS(result.sr_item.i_node_ref_size);
2719
2720
	if (ind->mi_lazy_delete) {
2721
		if (!idx_lazy_delete_on_node(ind, iref.ir_block, &result.sr_item)) {
2722
			/* We need to remove some items from this node: */
2723
2724
			if (!lock_structure) {
2725
				xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2726
				XT_INDEX_UNLOCK(ind, ot);
2727
				lock_structure = TRUE;
2728
				goto lock_and_retry;
2729
			}
2730
2731
			if (!idx_set_item_deleted(ot, ind, &iref, &result.sr_item))
2732
				goto failed;
2733
			if (!idx_remove_lazy_deleted_item_in_node(ot, ind, current, &iref, key_value))
2734
				goto failed;
2735
			goto done_ok;
2736
		}
2737
2738
		if (!ot->ot_table->tab_dic.dic_no_lazy_delete) {
2739
			/* {LAZY-DEL-INDEX-ITEMS}
2740
			 * We just set item to deleted, this is a significant time
2741
			 * saver.
2742
			 * But this item can only be cleaned up when all
2743
			 * items on the node below are deleted.
2744
			 */
2745
			if (!idx_lazy_delete_branch_item(ot, ind, &iref, &result.sr_item))
2746
				goto failed;
2747
			goto done_ok;
2748
		}
2749
	}
2750
2751
	/* We will have to remove the key from a non-leaf node,
2752
	 * which means we are changing the structure of the index.
2753
	 * Make sure we have a structural lock:
2754
	 */
2755
	if (!lock_structure) {
2756
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2757
		XT_INDEX_UNLOCK(ind, ot);
2758
		lock_structure = TRUE;
2759
		goto lock_and_retry;
2760
	}
2761
2762
	/* This is the item we will have to replace: */
2763
	if (!idx_remove_item_in_node(ot, ind, &stack, &iref, key_value))
2764
		goto failed;
2765
2766
	done_ok:
2767
	XT_INDEX_UNLOCK(ind, ot);
2768
2769
#ifdef DEBUG
2770
	//printf("DELETE OK\n");
2771
	//idx_check_index(ot, ind, TRUE);
2772
#endif
2773
	xt_ind_unreserve(ot);
2774
	return OK;
2775
2776
	failed:
2777
	XT_INDEX_UNLOCK(ind, ot);
2778
	xt_ind_unreserve(ot);
2779
	return FAILED;
2780
}
2781
2782
xtPublic xtBool xt_idx_delete(XTOpenTablePtr ot, XTIndexPtr ind, xtRecordID rec_id, xtWord1 *rec_buf)
2783
{
2784
	XTIdxKeyValueRec	key_value;
2785
	xtWord1				key_buf[XT_INDEX_MAX_KEY_SIZE + XT_MAX_RECORD_REF_SIZE];
2786
2787
	retry_after_oom:
2788
#ifdef XT_TRACK_INDEX_UPDATES
2789
	ot->ot_ind_changed = 0;
2790
#endif
2791
2792
	key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
2793
	key_value.sv_rec_id = rec_id;
2794
	key_value.sv_row_id = 0;
2795
	key_value.sv_key = key_buf;
2796
	key_value.sv_length = myxt_create_key_from_row(ind, key_buf, rec_buf, NULL);
2797
2798
	if (!idx_delete(ot, ind, &key_value)) {
2799
		if (idx_out_of_memory_failure(ot))
2800
			goto retry_after_oom;
2801
		return FAILED;
2802
	}
2803
	return OK;
2804
}
2805
2806
xtPublic xtBool xt_idx_update_row_id(XTOpenTablePtr ot, XTIndexPtr ind, xtRecordID rec_id, xtRowID row_id, xtWord1 *rec_buf)
2807
{
2808
	xtIndexNodeID		current;
2809
	XTIndReferenceRec	iref;
2810
	XTIdxResultRec		result;
2811
	XTIdxKeyValueRec	key_value;
2812
	xtWord1				key_buf[XT_INDEX_MAX_KEY_SIZE + XT_MAX_RECORD_REF_SIZE];
2813
2814
#ifdef DEBUG
2815
	iref.ir_xlock = 2;
2816
	iref.ir_updated = 2;
2817
#endif
2818
#ifdef CHECK_AND_PRINT
2819
	idx_check_index(ot, ind, TRUE);
2820
#endif
2821
	retry_after_oom:
2822
#ifdef XT_TRACK_INDEX_UPDATES
2823
	ot->ot_ind_changed = 0;
2824
#endif
2825
	key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
2826
	key_value.sv_rec_id = rec_id;
2827
	key_value.sv_row_id = 0;
2828
	key_value.sv_key = key_buf;
2829
	key_value.sv_length = myxt_create_key_from_row(ind, key_buf, rec_buf, NULL);
2830
2831
	/* NOTE: Only a read lock is required for this!!
2832
	 *
2833
	 * 09.05.2008 - This has changed because the dirty list now
2834
	 * hangs on the index. And the dirty list may be updated
2835
	 * by any change of the index.
2836
	 * However, the advantage is that I should be able to read
2837
	 * lock in the first phase of the flush.
2838
	 *
2839
	 * 18.02.2009 - This has changed again.
2840
	 * I am now using a read lock, because this update does not
2841
	 * require a structural change. In fact, it does not even
2842
	 * need a WRITE LOCK on the page affected, because there
2843
	 * is only ONE thread that can do this (the sweeper).
2844
	 *
2845
	 * This has the advantage that the sweeper (which uses this
2846
	 * function, causes less conflicts.
2847
	 *
2848
	 * However, it does mean that the dirty list must be otherwise
2849
	 * protected (which it now is be a spin lock - mi_dirty_lock).
2850
	 *
2851
	 * It also has the dissadvantage that I am going to have to
2852
	 * take an xlock in the first phase of the flush.
2853
	 */
2854
	XT_INDEX_READ_LOCK(ind, ot);
2855
2856
	if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
2857
		goto done_ok;
2858
2859
	while (XT_NODE_ID(current)) {
2860
		if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
2861
			goto failed;
2862
		ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &key_value, &result);
2863
		if (result.sr_found || !result.sr_item.i_node_ref_size)
2864
			break;
2865
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2866
		current = result.sr_branch;
2867
	}
2868
2869
	if (result.sr_found) {
2870
		/* TODO: Check that concurrent reads can handle this!
2871
		 * assuming the write is not atomic.
2872
		 */
2873
		if (!idx_set_item_row_id(ot, ind, &iref, &result.sr_item, row_id))
2874
			goto failed;
2875
		xt_ind_release(ot, ind, XT_UNLOCK_R_UPDATE, &iref);
2876
	}
2877
	else
2878
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2879
2880
	done_ok:
2881
	XT_INDEX_UNLOCK(ind, ot);
2882
2883
#ifdef DEBUG
2884
	//idx_check_index(ot, ind, TRUE);
2885
	//idx_check_on_key(ot);
2886
#endif
2887
	return OK;
2888
2889
	failed:
2890
	XT_INDEX_UNLOCK(ind, ot);
2891
	if (idx_out_of_memory_failure(ot))
2892
		goto retry_after_oom;
2893
	return FAILED;
2894
}
2895
2896
xtPublic void xt_idx_prep_key(XTIndexPtr ind, register XTIdxSearchKeyPtr search_key, int flags, xtWord1 *in_key_buf, size_t in_key_length)
2897
{
2898
	search_key->sk_key_value.sv_flags = flags;
2899
	search_key->sk_key_value.sv_rec_id = 0;
2900
	search_key->sk_key_value.sv_row_id = 0;
2901
	search_key->sk_key_value.sv_key = search_key->sk_key_buf;
2902
	search_key->sk_key_value.sv_length = myxt_create_key_from_key(ind, search_key->sk_key_buf, in_key_buf, in_key_length);
2903
	search_key->sk_on_key = FALSE;
2904
}
2905
2906
xtPublic xtBool xt_idx_research(XTOpenTablePtr ot, XTIndexPtr ind)
2907
{
2908
	XTIdxSearchKeyRec search_key;
2909
2910
	xt_ind_lock_handle(ot->ot_ind_rhandle);
2911
	search_key.sk_key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
2912
	xt_get_record_ref(&ot->ot_ind_rhandle->ih_branch->tb_data[ot->ot_ind_state.i_item_offset + ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE],
2913
		&search_key.sk_key_value.sv_rec_id, &search_key.sk_key_value.sv_row_id);
2914
	search_key.sk_key_value.sv_key = search_key.sk_key_buf;
2915
	search_key.sk_key_value.sv_length = ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE;
2916
	search_key.sk_on_key = FALSE;
2917
	memcpy(search_key.sk_key_buf, &ot->ot_ind_rhandle->ih_branch->tb_data[ot->ot_ind_state.i_item_offset], search_key.sk_key_value.sv_length);
2918
	xt_ind_unlock_handle(ot->ot_ind_rhandle);
2919
	return xt_idx_search(ot, ind, &search_key);
2920
}
2921
2922
/*
2923
 * Search for a given key and position the current pointer on the first
2924
 * key in the list of duplicates. If the key is not found the current
2925
 * pointer is placed at the first position after the key.
2926
 */
2927
xtPublic xtBool xt_idx_search(XTOpenTablePtr ot, XTIndexPtr ind, register XTIdxSearchKeyPtr search_key)
2928
{
2929
	IdxBranchStackRec	stack;
2930
	xtIndexNodeID		current;
2931
	XTIndReferenceRec	iref;
2932
	XTIdxResultRec		result;
2933
2934
#ifdef DEBUG
2935
	iref.ir_xlock = 2;
2936
	iref.ir_updated = 2;
2937
#endif
2938
	if (ot->ot_ind_rhandle) {
2939
		xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, ot->ot_thread);
2940
		ot->ot_ind_rhandle = NULL;
2941
	}
2942
#ifdef DEBUG
2943
	//idx_check_index(ot, ind, TRUE);
2944
#endif
2945
2946
	/* Calling from recovery, this is not the case.
2947
	 * But the index read does not require a transaction!
2948
	 * Only insert requires this to check for duplicates.
2949
	if (!ot->ot_thread->st_xact_data) {
2950
		xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
2951
		return FAILED;
2952
	}
2953
	*/
2954
2955
	retry_after_oom:
2956
#ifdef XT_TRACK_INDEX_UPDATES
2957
	ot->ot_ind_changed = 0;
2958
#endif
2959
	idx_newstack(&stack);
2960
2961
	ot->ot_curr_rec_id = 0;
2962
	ot->ot_curr_row_id = 0;
2963
2964
	XT_INDEX_READ_LOCK(ind, ot);
2965
2966
	if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
2967
		goto done_ok;
2968
2969
	while (XT_NODE_ID(current)) {
2970
		if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
2971
			goto failed;
2972
		ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &search_key->sk_key_value, &result);
2973
		if (result.sr_found)
2974
			/* If we have found the key in a node: */
2975
			search_key->sk_on_key = TRUE;
2976
		if (!result.sr_item.i_node_ref_size)
2977
			break;
2978
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
2979
		if (!idx_push(&stack, current, &result.sr_item))
2980
			goto failed;
2981
		current = result.sr_branch;
2982
	}
2983
2984
	if (ind->mi_lazy_delete) {
2985
		ignore_lazy_deleted_items:
2986
		while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
2987
			if (result.sr_row_id != (xtRowID) -1) {
2988
				idx_still_on_key(ind, search_key, iref.ir_branch, &result.sr_item);
2989
				break;
2990
			}
2991
			idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
2992
		}
2993
	}
2994
2995
	if (result.sr_item.i_item_offset == result.sr_item.i_total_size) {
2996
		IdxStackItemPtr node;
2997
2998
		/* We are at the end of a leaf node.
2999
		 * Go up the stack to find the start position of the next key.
3000
		 * If we find none, then we are the end of the index.
3001
		 */
3002
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3003
		while ((node = idx_pop(&stack))) {
3004
			if (node->i_pos.i_item_offset < node->i_pos.i_total_size) {
3005
				if (!xt_ind_fetch(ot, ind, node->i_branch, XT_LOCK_READ, &iref))
3006
					goto failed;
3007
				xt_get_res_record_ref(&iref.ir_branch->tb_data[node->i_pos.i_item_offset + node->i_pos.i_item_size - XT_RECORD_REF_SIZE], &result);
3008
3009
				if (ind->mi_lazy_delete) {
3010
					result.sr_item = node->i_pos;
3011
					if (result.sr_row_id == (xtRowID) -1) {
3012
						/* If this node pointer is lazy deleted, then
3013
						 * go down the next branch...
3014
						 */
3015
						idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3016
3017
						/* Go down to the bottom: */
3018
						current = node->i_branch;
3019
						while (XT_NODE_ID(current)) {
3020
							xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3021
							if (!idx_push(&stack, current, &result.sr_item))
3022
								goto failed;
3023
							current = result.sr_branch;
3024
							if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3025
								goto failed;
3026
							idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3027
							if (!result.sr_item.i_node_ref_size)
3028
								break;
3029
						}
3030
3031
						goto ignore_lazy_deleted_items;
3032
					}
3033
					idx_still_on_key(ind, search_key, iref.ir_branch, &result.sr_item);
3034
				}
3035
3036
				ot->ot_curr_rec_id = result.sr_rec_id;
3037
				ot->ot_curr_row_id = result.sr_row_id;
3038
				ot->ot_ind_state = node->i_pos;
3039
3040
				/* Convert the pointer to a handle which can be used in later operations: */
3041
				ASSERT_NS(!ot->ot_ind_rhandle);
3042
				if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3043
					goto failed;
3044
				/* Keep the node for next operations: */
3045
				/*
3046
				branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3047
				memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3048
				xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3049
				*/
3050
				break;
3051
			}
3052
		}
3053
	}
3054
	else {
3055
		ot->ot_curr_rec_id = result.sr_rec_id;
3056
		ot->ot_curr_row_id = result.sr_row_id;
3057
		ot->ot_ind_state = result.sr_item;
3058
3059
		/* Convert the pointer to a handle which can be used in later operations: */
3060
		ASSERT_NS(!ot->ot_ind_rhandle);
3061
		if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3062
			goto failed;
3063
		/* Keep the node for next operations: */
3064
		/*
3065
		branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3066
		memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3067
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3068
		*/
3069
	}
3070
3071
	done_ok:
3072
	XT_INDEX_UNLOCK(ind, ot);
3073
3074
#ifdef DEBUG
3075
	//idx_check_index(ot, ind, TRUE);
3076
	//idx_check_on_key(ot);
3077
#endif
3078
	ASSERT_NS(iref.ir_xlock == 2);
3079
	ASSERT_NS(iref.ir_updated == 2);
3080
	return OK;
3081
3082
	failed:
3083
	XT_INDEX_UNLOCK(ind, ot);
3084
	if (idx_out_of_memory_failure(ot))
3085
		goto retry_after_oom;
3086
	ASSERT_NS(iref.ir_xlock == 2);
3087
	ASSERT_NS(iref.ir_updated == 2);
3088
	return FAILED;
3089
}
3090
3091
xtPublic xtBool xt_idx_search_prev(XTOpenTablePtr ot, XTIndexPtr ind, register XTIdxSearchKeyPtr search_key)
3092
{
3093
	IdxBranchStackRec	stack;
3094
	xtIndexNodeID		current;
3095
	XTIndReferenceRec	iref;
3096
	XTIdxResultRec		result;
3097
3098
#ifdef DEBUG
3099
	iref.ir_xlock = 2;
3100
	iref.ir_updated = 2;
3101
#endif
3102
	if (ot->ot_ind_rhandle) {
3103
		xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, ot->ot_thread);
3104
		ot->ot_ind_rhandle = NULL;
3105
	}
3106
#ifdef DEBUG
3107
	//idx_check_index(ot, ind, TRUE);
3108
#endif
3109
3110
	/* see the comment above in xt_idx_search */
3111
	/*
3112
	if (!ot->ot_thread->st_xact_data) {
3113
		xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
3114
		return FAILED;
3115
	}
3116
	*/
3117
3118
	retry_after_oom:
3119
#ifdef XT_TRACK_INDEX_UPDATES
3120
	ot->ot_ind_changed = 0;
3121
#endif
3122
	idx_newstack(&stack);
3123
3124
	ot->ot_curr_rec_id = 0;
3125
	ot->ot_curr_row_id = 0;
3126
3127
	XT_INDEX_READ_LOCK(ind, ot);
3128
3129
	if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
3130
		goto done_ok;
3131
3132
	while (XT_NODE_ID(current)) {
3133
		if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3134
			goto failed;
3135
		ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &search_key->sk_key_value, &result);
3136
		if (result.sr_found)
3137
			/* If we have found the key in a node: */
3138
			search_key->sk_on_key = TRUE;
3139
		if (!result.sr_item.i_node_ref_size)
3140
			break;
3141
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3142
		if (!idx_push(&stack, current, &result.sr_item))
3143
			goto failed;
3144
		current = result.sr_branch;
3145
	}
3146
3147
	if (result.sr_item.i_item_offset == 0) {
3148
		IdxStackItemPtr node;
3149
3150
		search_up_stack:
3151
		/* We are at the start of a leaf node.
3152
		 * Go up the stack to find the start position of the next key.
3153
		 * If we find none, then we are the end of the index.
3154
		 */
3155
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3156
		while ((node = idx_pop(&stack))) {
3157
			if (node->i_pos.i_item_offset > node->i_pos.i_node_ref_size) {
3158
				if (!xt_ind_fetch(ot, ind, node->i_branch, XT_LOCK_READ, &iref))
3159
					goto failed;
3160
				result.sr_item = node->i_pos;
3161
				ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3162
3163
				if (ind->mi_lazy_delete) {
3164
					if (result.sr_row_id == (xtRowID) -1) {
3165
						/* Go down to the bottom, in order to scan the leaf backwards: */
3166
						current = node->i_branch;
3167
						while (XT_NODE_ID(current)) {
3168
							xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3169
							if (!idx_push(&stack, current, &result.sr_item))
3170
								goto failed;
3171
							current = result.sr_branch;
3172
							if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3173
								goto failed;
3174
							ind->mi_last_item(ot->ot_table, ind, iref.ir_branch, &result);
3175
							if (!result.sr_item.i_node_ref_size)
3176
								break;
3177
						}
3178
3179
						/* If the leaf empty we have to go up the stack again... */
3180
						if (result.sr_item.i_total_size == 0)
3181
							goto search_up_stack;
3182
3183
						goto scan_back_in_leaf;
3184
					}
3185
				}
3186
3187
				goto record_found;
3188
			}
3189
		}
3190
		goto done_ok;
3191
	}
3192
3193
	/* We must just step once to the left in this leaf node... */
3194
	ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3195
3196
	if (ind->mi_lazy_delete) {
3197
		scan_back_in_leaf:
3198
		while (result.sr_row_id == (xtRowID) -1) {
3199
			if (result.sr_item.i_item_offset == 0)
3200
				goto search_up_stack;
3201
			ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3202
		}
3203
		idx_still_on_key(ind, search_key, iref.ir_branch, &result.sr_item);
3204
	}
3205
3206
	record_found:
3207
	ot->ot_curr_rec_id = result.sr_rec_id;
3208
	ot->ot_curr_row_id = result.sr_row_id;
3209
	ot->ot_ind_state = result.sr_item;
3210
3211
	/* Convert to handle for later operations: */
3212
	ASSERT_NS(!ot->ot_ind_rhandle);
3213
	if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3214
		goto failed;
3215
	/* Keep a copy of the node for previous operations... */
3216
	/*
3217
	u_int branch_size;
3218
3219
	branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3220
	memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3221
	xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3222
	*/
3223
3224
	done_ok:
3225
	XT_INDEX_UNLOCK(ind, ot);
3226
3227
#ifdef DEBUG
3228
	//idx_check_index(ot, ind, TRUE);
3229
	//idx_check_on_key(ot);
3230
#endif
3231
	return OK;
3232
3233
	failed:
3234
	XT_INDEX_UNLOCK(ind, ot);
3235
	if (idx_out_of_memory_failure(ot))
3236
		goto retry_after_oom;
3237
	return FAILED;
3238
}
3239
3240
/*
3241
 * Copy the current index value to the record.
3242
 */
3243
xtPublic xtBool xt_idx_read(XTOpenTablePtr ot, XTIndexPtr ind, xtWord1 *rec_buf)
3244
{
3245
	xtWord1	*bitem;
3246
3247
#ifdef DEBUG
3248
	//idx_check_on_key(ot);
3249
#endif
3250
	xt_ind_lock_handle(ot->ot_ind_rhandle);
3251
	bitem = ot->ot_ind_rhandle->ih_branch->tb_data + ot->ot_ind_state.i_item_offset;
3252
	myxt_create_row_from_key(ot, ind, bitem, ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE, rec_buf);
3253
	xt_ind_unlock_handle(ot->ot_ind_rhandle);
3254
	return OK;
3255
}
3256
3257
xtPublic xtBool xt_idx_next(register XTOpenTablePtr ot, register XTIndexPtr ind, register XTIdxSearchKeyPtr search_key)
3258
{
3259
	XTIdxKeyValueRec	key_value;
3260
	xtWord1				key_buf[XT_INDEX_MAX_KEY_SIZE];
3261
	XTIdxResultRec		result;
3262
	IdxBranchStackRec	stack;
3263
	xtIndexNodeID		current;
3264
	XTIndReferenceRec	iref;
3265
3266
#ifdef DEBUG
3267
	iref.ir_xlock = 2;
3268
	iref.ir_updated = 2;
3269
#endif
3270
	ASSERT_NS(ot->ot_ind_rhandle);
3271
	xt_ind_lock_handle(ot->ot_ind_rhandle);
3272
	result.sr_item = ot->ot_ind_state;
3273
	if (!result.sr_item.i_node_ref_size && 
3274
		result.sr_item.i_item_offset < result.sr_item.i_total_size && 
3275
		ot->ot_ind_rhandle->ih_cache_reference) {
3276
		XTIdxItemRec prev_item;
3277
3278
		key_value.sv_key = &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset];
3279
		key_value.sv_length = result.sr_item.i_item_size - XT_RECORD_REF_SIZE;
3280
3281
		prev_item = result.sr_item;
3282
		idx_next_branch_item(ot->ot_table, ind, ot->ot_ind_rhandle->ih_branch, &result);
3283
3284
		if (ind->mi_lazy_delete) {
3285
			while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
3286
				if (result.sr_row_id != (xtRowID) -1)
3287
					break;
3288
				prev_item = result.sr_item;
3289
				idx_next_branch_item(ot->ot_table, ind, ot->ot_ind_rhandle->ih_branch, &result);
3290
			}
3291
		}
3292
3293
		if (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
3294
			/* Still on key? */
3295
			idx_still_on_key(ind, search_key, ot->ot_ind_rhandle->ih_branch, &result.sr_item);
3296
			xt_ind_unlock_handle(ot->ot_ind_rhandle);
3297
			goto checked_on_key;
3298
		}
3299
3300
		result.sr_item = prev_item;
3301
	}
3302
3303
	key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
3304
	xt_get_record_ref(&ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size - XT_RECORD_REF_SIZE], &key_value.sv_rec_id, &key_value.sv_row_id);
3305
	key_value.sv_key = key_buf;
3306
	key_value.sv_length = result.sr_item.i_item_size - XT_RECORD_REF_SIZE;
3307
	memcpy(key_buf, &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset], key_value.sv_length);
3308
	xt_ind_release_handle(ot->ot_ind_rhandle, TRUE, ot->ot_thread);
3309
	ot->ot_ind_rhandle = NULL;
3310
3311
	retry_after_oom:
3312
#ifdef XT_TRACK_INDEX_UPDATES
3313
	ot->ot_ind_changed = 0;
3314
#endif
3315
	idx_newstack(&stack);
3316
3317
	XT_INDEX_READ_LOCK(ind, ot);
3318
3319
	if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root))) {
3320
		XT_INDEX_UNLOCK(ind, ot);
3321
		return OK;
3322
	}
3323
3324
	while (XT_NODE_ID(current)) {
3325
		if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3326
			goto failed;
3327
		ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &key_value, &result);
3328
		if (result.sr_item.i_node_ref_size) {
3329
			if (result.sr_found) {
3330
				/* If we have found the key in a node: */
3331
				idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3332
3333
				/* Go down to the bottom: */
3334
				while (XT_NODE_ID(current)) {
3335
					xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3336
					if (!idx_push(&stack, current, &result.sr_item))
3337
						goto failed;
3338
					current = result.sr_branch;
3339
					if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3340
						goto failed;
3341
					idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3342
					if (!result.sr_item.i_node_ref_size)
3343
						break;
3344
				}
3345
3346
				/* Is the leaf not empty, then we are done... */
3347
				break;
3348
			}
3349
		}
3350
		else {
3351
			/* We have reached the leaf. */
3352
			if (result.sr_found)
3353
				/* If we have found the key in a leaf: */
3354
				idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3355
			/* If we did not find the key (although we should have). Our
3356
			 * position is automatically the next one.
3357
			 */
3358
			break;
3359
		}
3360
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3361
		if (!idx_push(&stack, current, &result.sr_item))
3362
			goto failed;
3363
		current = result.sr_branch;
3364
	}
3365
3366
	if (ind->mi_lazy_delete) {
3367
		ignore_lazy_deleted_items:
3368
		while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
3369
			if (result.sr_row_id != (xtRowID) -1)
3370
				break;
3371
			idx_next_branch_item(NULL, ind, iref.ir_branch, &result);
3372
		}
3373
	}
3374
3375
	/* Check the current position in a leaf: */
3376
	if (result.sr_item.i_item_offset == result.sr_item.i_total_size) {
3377
		/* At the end: */
3378
		IdxStackItemPtr node;
3379
3380
		/* We are at the end of a leaf node.
3381
		 * Go up the stack to find the start poition of the next key.
3382
		 * If we find none, then we are the end of the index.
3383
		 */
3384
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3385
		while ((node = idx_pop(&stack))) {
3386
			if (node->i_pos.i_item_offset < node->i_pos.i_total_size) {
3387
				if (!xt_ind_fetch(ot, ind, node->i_branch, XT_LOCK_READ, &iref))
3388
					goto failed;
3389
				result.sr_item = node->i_pos;
3390
				xt_get_res_record_ref(&iref.ir_branch->tb_data[result.sr_item.i_item_offset + result.sr_item.i_item_size - XT_RECORD_REF_SIZE], &result);
3391
3392
				if (ind->mi_lazy_delete) {
3393
					if (result.sr_row_id == (xtRowID) -1) {
3394
						/* If this node pointer is lazy deleted, then
3395
						 * go down the next branch...
3396
						 */
3397
						idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3398
3399
						/* Go down to the bottom: */
3400
						current = node->i_branch;
3401
						while (XT_NODE_ID(current)) {
3402
							xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3403
							if (!idx_push(&stack, current, &result.sr_item))
3404
								goto failed;
3405
							current = result.sr_branch;
3406
							if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3407
								goto failed;
3408
							idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
3409
							if (!result.sr_item.i_node_ref_size)
3410
								break;
3411
						}
3412
3413
						/* And scan the leaf... */
3414
						goto ignore_lazy_deleted_items;
3415
					}
3416
				}
3417
3418
				goto unlock_check_on_key;
3419
			}
3420
		}
3421
3422
		/* No more keys: */
3423
		if (search_key)
3424
			search_key->sk_on_key = FALSE;
3425
		ot->ot_curr_rec_id = 0;
3426
		ot->ot_curr_row_id = 0;
3427
		XT_INDEX_UNLOCK(ind, ot);
3428
		return OK;
3429
	}
3430
3431
	unlock_check_on_key:
3432
3433
	ASSERT_NS(!ot->ot_ind_rhandle);
3434
	if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3435
		goto failed;
3436
	/*
3437
	u_int branch_size;
3438
3439
	branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3440
	memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3441
	xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3442
	*/
3443
3444
	XT_INDEX_UNLOCK(ind, ot);
3445
3446
	/* Still on key? */
3447
	if (search_key && search_key->sk_on_key) {
3448
		/* GOTCHA: As a short-cut I was using a length compare
3449
		 * and a memcmp() here to check whether we as still on
3450
		 * the original search key.
3451
		 * This does not work because it does not take into account
3452
		 * trialing spaces (which are ignored in comparison).
3453
		 * So lengths can be different, but values still equal.
3454
		 * 
3455
		 * NOTE: We have to use the original search flags for
3456
		 * this compare.
3457
		 */
3458
		xt_ind_lock_handle(ot->ot_ind_rhandle);
3459
		search_key->sk_on_key = myxt_compare_key(ind, search_key->sk_key_value.sv_flags, search_key->sk_key_value.sv_length,
3460
			search_key->sk_key_value.sv_key, &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset]) == 0;
3461
		xt_ind_unlock_handle(ot->ot_ind_rhandle);
3462
	}
3463
3464
	checked_on_key:
3465
	ot->ot_curr_rec_id = result.sr_rec_id;
3466
	ot->ot_curr_row_id = result.sr_row_id;
3467
	ot->ot_ind_state = result.sr_item;
3468
3469
	return OK;
3470
3471
	failed:
3472
	XT_INDEX_UNLOCK(ind, ot);
3473
	if (idx_out_of_memory_failure(ot))
3474
		goto retry_after_oom;
3475
	return FAILED;
3476
}
3477
3478
xtPublic xtBool xt_idx_prev(register XTOpenTablePtr ot, register XTIndexPtr ind, register XTIdxSearchKeyPtr search_key)
3479
{
3480
	XTIdxKeyValueRec	key_value;
3481
	xtWord1				key_buf[XT_INDEX_MAX_KEY_SIZE];
3482
	XTIdxResultRec		result;
3483
	IdxBranchStackRec	stack;
3484
	xtIndexNodeID		current;
3485
	XTIndReferenceRec	iref;
3486
	IdxStackItemPtr		node;
3487
3488
#ifdef DEBUG
3489
	iref.ir_xlock = 2;
3490
	iref.ir_updated = 2;
3491
#endif
3492
	ASSERT_NS(ot->ot_ind_rhandle);
3493
	xt_ind_lock_handle(ot->ot_ind_rhandle);
3494
	result.sr_item = ot->ot_ind_state;
3495
	if (!result.sr_item.i_node_ref_size && result.sr_item.i_item_offset > 0) {
3496
		key_value.sv_key = &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset];
3497
		key_value.sv_length = result.sr_item.i_item_size - XT_RECORD_REF_SIZE;
3498
3499
		ind->mi_prev_item(ot->ot_table, ind, ot->ot_ind_rhandle->ih_branch, &result);
3500
3501
		if (ind->mi_lazy_delete) {
3502
			while (result.sr_row_id == (xtRowID) -1) {
3503
				if (result.sr_item.i_item_offset == 0)
3504
					goto research;
3505
				ind->mi_prev_item(ot->ot_table, ind, ot->ot_ind_rhandle->ih_branch, &result);
3506
			}
3507
		}
3508
3509
		idx_still_on_key(ind, search_key, ot->ot_ind_rhandle->ih_branch, &result.sr_item);
3510
3511
		xt_ind_unlock_handle(ot->ot_ind_rhandle);
3512
		goto checked_on_key;
3513
	}
3514
3515
	research:
3516
	key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
3517
	key_value.sv_rec_id = ot->ot_curr_rec_id;
3518
	key_value.sv_row_id = 0;
3519
	key_value.sv_key = key_buf;
3520
	key_value.sv_length = result.sr_item.i_item_size - XT_RECORD_REF_SIZE;
3521
	memcpy(key_buf, &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset], key_value.sv_length);
3522
	xt_ind_release_handle(ot->ot_ind_rhandle, TRUE, ot->ot_thread);
3523
	ot->ot_ind_rhandle = NULL;
3524
3525
	retry_after_oom:
3526
#ifdef XT_TRACK_INDEX_UPDATES
3527
	ot->ot_ind_changed = 0;
3528
#endif
3529
	idx_newstack(&stack);
3530
3531
	XT_INDEX_READ_LOCK(ind, ot);
3532
3533
	if (!(XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root))) {
3534
		XT_INDEX_UNLOCK(ind, ot);
3535
		return OK;
3536
	}
3537
3538
	while (XT_NODE_ID(current)) {
3539
		if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3540
			goto failed;
3541
		ind->mi_scan_branch(ot->ot_table, ind, iref.ir_branch, &key_value, &result);
3542
		if (result.sr_item.i_node_ref_size) {
3543
			if (result.sr_found) {
3544
				/* If we have found the key in a node: */
3545
3546
				search_down_stack:
3547
				/* Go down to the bottom: */
3548
				while (XT_NODE_ID(current)) {
3549
					xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3550
					if (!idx_push(&stack, current, &result.sr_item))
3551
						goto failed;
3552
					current = result.sr_branch;
3553
					if (!xt_ind_fetch(ot, ind, current, XT_LOCK_READ, &iref))
3554
						goto failed;
3555
					ind->mi_last_item(ot->ot_table, ind, iref.ir_branch, &result);
3556
					if (!result.sr_item.i_node_ref_size)
3557
						break;
3558
				}
3559
3560
				/* If the leaf empty we have to go up the stack again... */
3561
				if (result.sr_item.i_total_size == 0)
3562
					break;
3563
3564
				if (ind->mi_lazy_delete) {
3565
					while (result.sr_row_id == (xtRowID) -1) {
3566
						if (result.sr_item.i_item_offset == 0)
3567
							goto search_up_stack;
3568
						ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3569
					}
3570
				}
3571
3572
				goto unlock_check_on_key;
3573
			}
3574
		}
3575
		else {
3576
			/* We have reached the leaf.
3577
			 * Whether we found the key or not, we have
3578
			 * to move one to the left.
3579
			 */
3580
			if (result.sr_item.i_item_offset == 0)
3581
				break;
3582
			ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3583
3584
			if (ind->mi_lazy_delete) {
3585
				while (result.sr_row_id == (xtRowID) -1) {
3586
					if (result.sr_item.i_item_offset == 0)
3587
						goto search_up_stack;
3588
					ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3589
				}
3590
			}
3591
3592
			goto unlock_check_on_key;
3593
		}
3594
		xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3595
		if (!idx_push(&stack, current, &result.sr_item))
3596
			goto failed;
3597
		current = result.sr_branch;
3598
	}
3599
3600
	search_up_stack:
3601
	/* We are at the start of a leaf node.
3602
	 * Go up the stack to find the start poition of the next key.
3603
	 * If we find none, then we are the end of the index.
3604
	 */
3605
	xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3606
	while ((node = idx_pop(&stack))) {
3607
		if (node->i_pos.i_item_offset > node->i_pos.i_node_ref_size) {
3608
			if (!xt_ind_fetch(ot, ind, node->i_branch, XT_LOCK_READ, &iref))
3609
				goto failed;
3610
			result.sr_item = node->i_pos;
3611
			ind->mi_prev_item(ot->ot_table, ind, iref.ir_branch, &result);
3612
3613
			if (ind->mi_lazy_delete) {
3614
				if (result.sr_row_id == (xtRowID) -1) {
3615
					current = node->i_branch;
3616
					goto search_down_stack;
3617
				}
3618
			}
3619
3620
			goto unlock_check_on_key;
3621
		}
3622
	}
3623
3624
	/* No more keys: */
3625
	if (search_key)
3626
		search_key->sk_on_key = FALSE;
3627
	ot->ot_curr_rec_id = 0;
3628
	ot->ot_curr_row_id = 0;
3629
3630
	XT_INDEX_UNLOCK(ind, ot);
3631
	return OK;
3632
3633
	unlock_check_on_key:
3634
	ASSERT_NS(!ot->ot_ind_rhandle);
3635
	if (!(ot->ot_ind_rhandle = xt_ind_get_handle(ot, ind, &iref)))
3636
		goto failed;
3637
	/*
3638
	u_int branch_size;
3639
3640
	branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref.ir_branch->tb_size_2));
3641
	memcpy(&ot->ot_ind_rbuf, iref.ir_branch, branch_size);
3642
	xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
3643
	*/
3644
3645
	XT_INDEX_UNLOCK(ind, ot);
3646
3647
	/* Still on key? */
3648
	if (search_key && search_key->sk_on_key) {
3649
		xt_ind_lock_handle(ot->ot_ind_rhandle);
3650
		search_key->sk_on_key = myxt_compare_key(ind, search_key->sk_key_value.sv_flags, search_key->sk_key_value.sv_length,
3651
			search_key->sk_key_value.sv_key, &ot->ot_ind_rhandle->ih_branch->tb_data[result.sr_item.i_item_offset]) == 0;
3652
		xt_ind_unlock_handle(ot->ot_ind_rhandle);
3653
	}
3654
3655
	checked_on_key:
3656
	ot->ot_curr_rec_id = result.sr_rec_id;
3657
	ot->ot_curr_row_id = result.sr_row_id;
3658
	ot->ot_ind_state = result.sr_item;
3659
	return OK;
3660
3661
	failed:
3662
	XT_INDEX_UNLOCK(ind, ot);
3663
	if (idx_out_of_memory_failure(ot))
3664
		goto retry_after_oom;
3665
	return FAILED;
3666
}
3667
3668
/* Return TRUE if the record matches the current index search! */
3669
xtPublic xtBool xt_idx_match_search(register XTOpenTablePtr XT_UNUSED(ot), register XTIndexPtr ind, register XTIdxSearchKeyPtr search_key, xtWord1 *buf, int mode)
3670
{
3671
	int		r;
3672
	xtWord1	key_buf[XT_INDEX_MAX_KEY_SIZE];
3673
3674
	myxt_create_key_from_row(ind, key_buf, (xtWord1 *) buf, NULL);
3675
	r = myxt_compare_key(ind, search_key->sk_key_value.sv_flags, search_key->sk_key_value.sv_length, search_key->sk_key_value.sv_key, key_buf);
3676
	switch (mode) {
3677
		case XT_S_MODE_MATCH:
3678
			return r == 0;
3679
		case XT_S_MODE_NEXT:
3680
			return r <= 0;
3681
		case XT_S_MODE_PREV:
3682
			return r >= 0;
3683
	}
3684
	return FALSE;
3685
}
3686
3687
static void idx_set_index_selectivity(XTOpenTablePtr ot, XTIndexPtr ind, XTThreadPtr thread)
3688
{
3689
	static const xtRecordID MAX_RECORDS = 100;
3690
3691
	XTIdxSearchKeyRec	search_key;
3692
	XTIndexSegPtr		key_seg;
3693
	u_int				select_count[2] = {0, 0};
3694
	xtWord1				key_buf[XT_INDEX_MAX_KEY_SIZE];
3695
	u_int				key_len;
3696
	xtWord1				*next_key_buf;
3697
	u_int				next_key_len;
3698
	u_int				curr_len;
3699
	u_int				diff;
3700
	u_int				j, i;
3701
	/* these 2 vars are used to check the overlapping if we have < 200 records */
3702
	xtRecordID			last_rec = 0;		/* last record accounted in this iteration */
3703
	xtRecordID			last_iter_rec = 0;	/* last record accounted in the previous iteration */
3704
3705
	xtBool	(* xt_idx_iterator[2])(
3706
		register struct XTOpenTable *ot, register struct XTIndex *ind, register XTIdxSearchKeyPtr search_key) = {
3707
3708
		xt_idx_next,
3709
		xt_idx_prev
3710
	};
3711
3712
	xtBool	(* xt_idx_begin[2])(
3713
		struct XTOpenTable *ot, struct XTIndex *ind, register XTIdxSearchKeyPtr search_key) = {
3714
	
3715
		xt_idx_search,
3716
		xt_idx_search_prev
3717
	};
3718
3719
	ind->mi_select_total = 0;
3720
	key_seg = ind->mi_seg;
3721
	for (i=0; i < ind->mi_seg_count; key_seg++, i++) {
3722
		key_seg->is_selectivity = 1;
3723
		key_seg->is_recs_in_range = 1;
3724
	}
3725
3726
	for (j=0; j < 2; j++) {
3727
		xt_idx_prep_key(ind, &search_key, j == 0 ? XT_SEARCH_FIRST_FLAG : XT_SEARCH_AFTER_LAST_FLAG, NULL, 0);
3728
		if (!(xt_idx_begin[j])(ot, ind, &search_key))
3729
			goto failed;
3730
3731
		/* Initialize the buffer with the first index valid index entry: */
3732
		while (!select_count[j] && ot->ot_curr_rec_id != last_iter_rec) {
3733
			if (ot->ot_curr_row_id) {
3734
				select_count[j]++;
3735
				last_rec = ot->ot_curr_rec_id;
3736
3737
				key_len = ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE;
3738
				xt_ind_lock_handle(ot->ot_ind_rhandle);
3739
				memcpy(key_buf, ot->ot_ind_rhandle->ih_branch->tb_data + ot->ot_ind_state.i_item_offset, key_len);
3740
				xt_ind_unlock_handle(ot->ot_ind_rhandle);
3741
			}
3742
			if (!(xt_idx_iterator[j])(ot, ind, &search_key))
3743
				goto failed_1;
3744
		}
3745
3746
		while (select_count[j] < MAX_RECORDS && ot->ot_curr_rec_id != last_iter_rec) {
3747
			/* Check if the index entry is committed: */
3748
			if (ot->ot_curr_row_id) {
3749
				xt_ind_lock_handle(ot->ot_ind_rhandle);
3750
				select_count[j]++;
3751
				last_rec = ot->ot_curr_rec_id;
3752
3753
				next_key_len = ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE;
3754
				next_key_buf = ot->ot_ind_rhandle->ih_branch->tb_data + ot->ot_ind_state.i_item_offset;
3755
			
3756
				curr_len = 0;
3757
				diff = FALSE;
3758
				key_seg = ind->mi_seg;
3759
				for (i=0; i < ind->mi_seg_count; key_seg++, i++) {
3760
					curr_len += myxt_key_seg_length(key_seg, curr_len, key_buf);
3761
					if (!diff && myxt_compare_key(ind, 0, curr_len, key_buf, next_key_buf) != 0)
3762
						diff = i+1;
3763
					if (diff)
3764
						key_seg->is_selectivity++;
3765
				}
3766
3767
				/* Store the key for the next comparison: */
3768
				key_len = next_key_len;
3769
				memcpy(key_buf, next_key_buf, key_len);
3770
				xt_ind_unlock_handle(ot->ot_ind_rhandle);
3771
			}
3772
3773
			if (!(xt_idx_iterator[j])(ot, ind, &search_key))
3774
				goto failed_1;
3775
		}
3776
3777
		last_iter_rec = last_rec;
3778
3779
		if (ot->ot_ind_rhandle) {
3780
			xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, thread);
3781
			ot->ot_ind_rhandle = NULL;
3782
		}
3783
	}
3784
3785
	u_int select_total;
3786
3787
	select_total = select_count[0] + select_count[1];
3788
	if (select_total) {
3789
		u_int recs;
3790
3791
		ind->mi_select_total = select_total;
3792
		key_seg = ind->mi_seg;
3793
		for (i=0; i < ind->mi_seg_count; key_seg++, i++) {
3794
			recs = (u_int) ((double) select_total / (double) key_seg->is_selectivity + (double) 0.5);
3795
			key_seg->is_recs_in_range = recs ? recs : 1;
3796
		}
3797
	}
3798
	return;
3799
3800
	failed_1:
3801
	if (ot->ot_ind_rhandle) {
3802
		xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, thread);
3803
		ot->ot_ind_rhandle = NULL;
3804
	}
3805
3806
	failed:
3807
	xt_tab_disable_index(ot->ot_table, XT_INDEX_CORRUPTED);
3808
	xt_log_and_clear_exception_ns();
3809
	return;
3810
}
3811
3812
xtPublic void xt_ind_set_index_selectivity(XTOpenTablePtr ot, XTThreadPtr thread)
3813
{
3814
	XTTableHPtr		tab = ot->ot_table;
3815
	XTIndexPtr		*ind;
3816
	u_int			i;
3817
	time_t			now;
3818
3819
	now = time(NULL);
3820
	xt_lock_mutex_ns(&tab->tab_ind_stat_lock);
3821
	if (tab->tab_ind_stat_calc_time < now) {
3822
		if (!tab->tab_dic.dic_disable_index) {
3823
			for (i=0, ind=tab->tab_dic.dic_keys; i<tab->tab_dic.dic_key_count; i++, ind++)
3824
				idx_set_index_selectivity(ot, *ind, thread);
3825
		}
3826
		tab->tab_ind_stat_calc_time = time(NULL);
3827
	}
3828
	xt_unlock_mutex_ns(&tab->tab_ind_stat_lock);
3829
}
3830
3831
/*
3832
 * -----------------------------------------------------------------------
3833
 * Print a b-tree
3834
 */
3835
3836
#ifdef TEST_CODE
3837
static void idx_check_on_key(XTOpenTablePtr ot)
3838
{
3839
	u_int		offs = ot->ot_ind_state.i_item_offset + ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE;
3840
	xtRecordID	rec_id;
3841
	xtRowID		row_id;
3842
	
3843
	if (ot->ot_curr_rec_id && ot->ot_ind_state.i_item_offset < ot->ot_ind_state.i_total_size) {
3844
		xt_get_record_ref(&ot->ot_ind_rbuf.tb_data[offs], &rec_id, &row_id);
3845
		
3846
		ASSERT_NS(rec_id == ot->ot_curr_rec_id);
3847
	}
3848
}
3849
#endif
3850
3851
static void idx_check_space(int 
3852
#ifdef DUMP_INDEX
3853
depth
3854
#endif
3855
)
3856
{
3857
#ifdef DUMP_INDEX
3858
	for (int i=0; i<depth; i++)
3859
		printf(". ");
3860
#endif
3861
}
3862
3863
#ifdef DO_COMP_TEST
3864
3865
//#define FILL_COMPRESS_BLOCKS
3866
3867
#ifdef FILL_COMPRESS_BLOCKS
3868
#define COMPRESS_BLOCK_SIZE			(1024 * 128)
3869
#else
3870
#define COMPRESS_BLOCK_SIZE			16384
3871
#endif
3872
3873
int blocks;
3874
int usage_total;
3875
3876
int zlib_1024;
3877
int zlib_2048;
3878
int zlib_4096;
3879
int zlib_8192;
3880
int zlib_16384;
3881
int zlib_total;
3882
int zlib_time;
3883
int read_time;
3884
int uncomp_time;
3885
int fill_size;
3886
int filled_size;
3887
unsigned char out[COMPRESS_BLOCK_SIZE];
3888
unsigned char uncomp[COMPRESS_BLOCK_SIZE];
3889
xtWord1 precomp[COMPRESS_BLOCK_SIZE+16000];
3890
3891
u_int idx_precompress(XTIndexPtr ind, u_int node_ref_size, u_int item_size, u_int insize, xtWord1 *in_data, xtWord1 *out_data)
3892
{
3893
	xtWord1 *prev_item = NULL;
3894
	xtWord1 *in_ptr = in_data;
3895
	xtWord1 *out_ptr = out_data;
3896
	u_int	out_size = 0;
3897
	u_int	same_size;
3898
3899
	if (insize >= node_ref_size) {
3900
		memcpy(out_ptr, in_ptr, node_ref_size);
3901
		insize -= node_ref_size;
3902
		out_size += node_ref_size;
3903
		in_ptr += node_ref_size;
3904
		out_ptr += node_ref_size;
3905
	}
3906
3907
	while (insize >= item_size + node_ref_size) {
3908
		if (prev_item) {
3909
			same_size = 0;
3910
			while (same_size < item_size + node_ref_size && *prev_item == *in_ptr) {
3911
				same_size++;
3912
				prev_item++;
3913
				in_ptr++;
3914
			}
3915
			ASSERT_NS(same_size < 256);
3916
			*out_ptr = (xtWord1) same_size;
3917
			out_size++;
3918
			out_ptr++;
3919
			same_size = item_size + node_ref_size - same_size;
3920
			memcpy(out_ptr, in_ptr, same_size);
3921
			out_size += same_size;
3922
			out_ptr += same_size;
3923
			in_ptr += same_size;
3924
			prev_item += same_size;
3925
		}
3926
		else {
3927
			prev_item = in_ptr;
3928
			memcpy(out_ptr, in_ptr, item_size + node_ref_size);
3929
			out_size += item_size + node_ref_size;
3930
			out_ptr += item_size + node_ref_size;
3931
			in_ptr += item_size + node_ref_size;
3932
		}
3933
		insize -= (item_size + node_ref_size);
3934
	}
3935
	return out_size;
3936
}
3937
3938
u_int idx_compress(u_int insize, xtWord1 *in_data, u_int outsize, xtWord1 *out_data)
3939
{
3940
	z_stream strm;
3941
	int ret;
3942
3943
	strm.zalloc = Z_NULL;
3944
	strm.zfree = Z_NULL;
3945
	strm.opaque = Z_NULL;
3946
	strm.avail_in = 0;
3947
	strm.next_in = Z_NULL;
3948
	ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION);
3949
	strm.avail_out = outsize;
3950
	strm.next_out = out_data;
3951
	strm.avail_in = insize;
3952
	strm.next_in = in_data;
3953
	ret = deflate(&strm, Z_FINISH);
3954
	deflateEnd(&strm);
3955
	return outsize - strm.avail_out;
3956
3957
/*
3958
	bz_stream strm;
3959
	int ret;
3960
3961
	memset(&strm, 0, sizeof(strm));
3962
3963
	ret = BZ2_bzCompressInit(&strm, 1, 0, 0);
3964
	strm.avail_out = outsize;
3965
	strm.next_out = (char *) out_data;
3966
	strm.avail_in = insize;
3967
	strm.next_in = (char *) in_data;
3968
	ret = BZ2_bzCompress(&strm, BZ_FINISH);
3969
3970
	BZ2_bzCompressEnd(&strm);
3971
	return outsize - strm.avail_out;
3972
*/
3973
}
3974
3975
u_int idx_decompress(u_int insize, xtWord1 *in_data, u_int outsize, xtWord1 *out_data)
3976
{
3977
	z_stream strm;
3978
	int ret;
3979
3980
	strm.zalloc = Z_NULL;
3981
	strm.zfree = Z_NULL;
3982
	strm.opaque = Z_NULL;
3983
	strm.avail_in = 0;
3984
	strm.next_in = Z_NULL;
3985
	ret = inflateInit(&strm);
3986
	strm.avail_out = outsize;
3987
	strm.next_out = out_data;
3988
	strm.avail_in = insize;
3989
	strm.next_in = in_data;
3990
	ret = inflate(&strm, Z_FINISH);
3991
	inflateEnd(&strm);
3992
	return outsize - strm.avail_out;
3993
3994
/*
3995
	bz_stream strm;
3996
	int ret;
3997
3998
	memset(&strm, 0, sizeof(strm));
3999
4000
	ret = BZ2_bzDecompressInit(&strm, 0, 0);
4001
	strm.avail_out = outsize;
4002
	strm.next_out = (char *) out_data;
4003
	strm.avail_in = insize;
4004
	strm.next_in = (char *) in_data;
4005
	ret = BZ2_bzDecompress(&strm);
4006
4007
	BZ2_bzDecompressEnd(&strm);
4008
	return outsize - strm.avail_out;
4009
*/
4010
}
4011
#endif // DO_COMP_TEST
4012
4013
static u_int idx_check_node(XTOpenTablePtr ot, XTIndexPtr ind, int depth, xtIndexNodeID node)
4014
{
4015
	XTIdxResultRec		result;
4016
	u_int				block_count = 1;
4017
	XTIndReferenceRec	iref;
4018
4019
#ifdef DO_COMP_TEST
4020
	unsigned comp_size;
4021
	unsigned uncomp_size;
4022
	xtWord8 now;
4023
	xtWord8 time;
4024
#endif
4025
4026
#ifdef DEBUG
4027
	iref.ir_xlock = 2;
4028
	iref.ir_updated = 2;
4029
#endif
4030
	ASSERT_NS(XT_NODE_ID(node) <= XT_NODE_ID(ot->ot_table->tab_ind_eof));
4031
#ifdef DO_COMP_TEST
4032
	now = xt_trace_clock();
4033
#endif
4034
	/* A deadlock can occur when taking a read lock
4035
	 * because the XT_IPAGE_WRITE_TRY_LOCK(&block->cb_lock, ot->ot_thread->t_id)
4036
	 * only takes into account WRITE locks.
4037
	 * So, if we hold a READ lock on a page, and ind_free_block() trys to
4038
	 * free the block, it hangs on its own read lock!
4039
	 *
4040
	 * So we change from READ lock to a WRITE lock.
4041
	 * If too restrictive then locks need to handle TRY on a
4042
	 * read lock as well.
4043
	 *
4044
	 * #3	0x00e576b6 in xt_yield at thread_xt.cc:1351
4045
	 * #4	0x00e7218e in xt_spinxslock_xlock at lock_xt.cc:1467
4046
	 * #5	0x00dee1a9 in ind_free_block at cache_xt.cc:901
4047
	 * #6	0x00dee500 in ind_cac_free_lru_blocks at cache_xt.cc:1054
4048
	 * #7	0x00dee88c in ind_cac_fetch at cache_xt.cc:1151
4049
	 * #8	0x00def6d4 in xt_ind_fetch at cache_xt.cc:1480
4050
	 * #9	0x00e1ce2e in idx_check_node at index_xt.cc:3996
4051
	 * #10	0x00e1cf2b in idx_check_node at index_xt.cc:4106
4052
	 * #11	0x00e1cf2b in idx_check_node at index_xt.cc:4106
4053
	 * #12	0x00e1cfdc in idx_check_index at index_xt.cc:4130
4054
	 * #13	0x00e1d11c in xt_check_indices at index_xt.cc:4181
4055
	 * #14	0x00e4aa82 in xt_check_table at table_xt.cc:2363
4056
	 */
4057
	if (!xt_ind_fetch(ot, ind, node, XT_LOCK_WRITE, &iref))
4058
		return 0;
4059
#ifdef DO_COMP_TEST
4060
	time = xt_trace_clock() - now;
4061
	read_time += time;
4062
#endif
4063
4064
	idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
4065
	ASSERT_NS(result.sr_item.i_total_size + offsetof(XTIdxBranchDRec, tb_data) <= XT_INDEX_PAGE_SIZE);
4066
4067
#ifdef DO_COMP_TEST
4068
	u_int size = result.sr_item.i_total_size;
4069
	xtWord1 *data = iref.ir_branch->tb_data;
4070
4071
/*
4072
	size = idx_precompress(ind, result.sr_item.i_node_ref_size, result.sr_item.i_item_size, size, data, precomp);
4073
	if (size > result.sr_item.i_total_size)
4074
		size = result.sr_item.i_total_size;
4075
	else
4076
		data = precomp;
4077
*/
4078
4079
	blocks++;
4080
	usage_total += result.sr_item.i_total_size;
4081
4082
#ifdef FILL_COMPRESS_BLOCKS
4083
	if (fill_size + size > COMPRESS_BLOCK_SIZE) {
4084
		now = xt_trace_clock();
4085
		comp_size = idx_compress(fill_size, precomp, COMPRESS_BLOCK_SIZE, out);
4086
		time = xt_trace_clock() - now;
4087
		zlib_time += time;
4088
4089
		zlib_total += comp_size;
4090
		filled_size += fill_size;
4091
4092
		now = xt_trace_clock();
4093
		uncomp_size = idx_decompress(comp_size, out, COMPRESS_BLOCK_SIZE, uncomp);
4094
		time = xt_trace_clock() - now;
4095
		uncomp_time += time;
4096
4097
		if (uncomp_size != fill_size)
4098
			printf("what?\n");
4099
4100
		fill_size = 0;
4101
	}
4102
	memcpy(precomp + fill_size, data, size);
4103
	fill_size += size;
4104
#else
4105
	now = xt_trace_clock();
4106
	comp_size = idx_compress(size, data, COMPRESS_BLOCK_SIZE, out);
4107
	time = xt_trace_clock() - now;
4108
	zlib_time += time;
4109
	zlib_total += comp_size;
4110
4111
	now = xt_trace_clock();
4112
	uncomp_size = idx_decompress(comp_size, out, COMPRESS_BLOCK_SIZE, uncomp);
4113
	time = xt_trace_clock() - now;
4114
	uncomp_time += time;
4115
	if (uncomp_size != size)
4116
		printf("what?\n");
4117
#endif
4118
4119
	if (comp_size <= 1024)
4120
		zlib_1024++;
4121
	else if (comp_size <= 2048)
4122
		zlib_2048++;
4123
	else if (comp_size <= 4096)
4124
		zlib_4096++;
4125
	else if (comp_size <= 8192)
4126
		zlib_8192++;
4127
	else
4128
		zlib_16384++;
4129
4130
#endif // DO_COMP_TEST
4131
4132
	if (result.sr_item.i_node_ref_size) {
4133
		idx_check_space(depth);
4134
#ifdef DUMP_INDEX
4135
		printf("%04d -->\n", (int) XT_NODE_ID(result.sr_branch));
4136
#endif
4137
#ifdef TRACK_ACTIVITY
4138
		track_block_exists(result.sr_branch);
4139
#endif
4140
		block_count += idx_check_node(ot, ind, depth+1, result.sr_branch);
4141
	}
4142
4143
	while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
4144
#ifdef CHECK_PRINTS_RECORD_REFERENCES
4145
		idx_check_space(depth);
4146
		if (result.sr_item.i_item_size == 12) {
4147
			/* Assume this is a NOT-NULL INT!: */
4148
			xtWord4 val = XT_GET_DISK_4(&iref.ir_branch->tb_data[result.sr_item.i_item_offset]);
4149
#ifdef DUMP_INDEX
4150
			printf("(%6d) ", (int) val);
4151
#endif
4152
		}
4153
#ifdef DUMP_INDEX
4154
		printf("rec=%d row=%d ", (int) result.sr_rec_id, (int) result.sr_row_id);
4155
		printf("\n");
4156
#endif
4157
#endif
4158
		idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
4159
		if (result.sr_item.i_node_ref_size) {
4160
			idx_check_space(depth);
4161
#ifdef DUMP_INDEX
4162
			printf("%04d -->\n", (int) XT_NODE_ID(result.sr_branch));
4163
#endif
4164
#ifdef TRACK_ACTIVITY
4165
			track_block_exists(result.sr_branch);
4166
#endif
4167
			block_count += idx_check_node(ot, ind, depth+1, result.sr_branch);
4168
		}
4169
	}
4170
4171
	xt_ind_release(ot, ind, XT_UNLOCK_WRITE, &iref);
4172
	return block_count;
4173
}
4174
4175
static u_int idx_check_index(XTOpenTablePtr ot, XTIndexPtr ind, xtBool with_lock)
4176
{
4177
	xtIndexNodeID			current;
4178
	u_int					block_count = 0;
4179
	u_int					i;
4180
4181
	if (with_lock)
4182
		XT_INDEX_WRITE_LOCK(ind, ot);
4183
4184
#ifdef DUMP_INDEX
4185
	printf("INDEX (%d) %04d ---------------------------------------\n", (int) ind->mi_index_no, (int) XT_NODE_ID(ind->mi_root));
4186
#endif
4187
	if ((XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root))) {
4188
#ifdef TRACK_ACTIVITY
4189
		track_block_exists(ind->mi_root);
4190
#endif
4191
		block_count = idx_check_node(ot, ind, 0, current);
4192
	}
4193
4194
	if (ind->mi_free_list && ind->mi_free_list->fl_free_count) {
4195
#ifdef DUMP_INDEX
4196
		printf("INDEX (%d) FREE ---------------------------------------", (int) ind->mi_index_no);
4197
#endif
4198
		ASSERT_NS(ind->mi_free_list->fl_start == 0);
4199
		for (i=0; i<ind->mi_free_list->fl_free_count; i++) {
4200
#ifdef DUMP_INDEX
4201
			if ((i % 40) == 0)
4202
				printf("\n");
4203
#endif
4204
			block_count++;
4205
#ifdef TRACK_ACTIVITY
4206
			track_block_exists(ind->mi_free_list->fl_page_id[i]);
4207
#endif
4208
#ifdef DUMP_INDEX
4209
			printf("%2d ", (int) XT_NODE_ID(ind->mi_free_list->fl_page_id[i]));
4210
#endif
4211
		}
4212
#ifdef DUMP_INDEX
4213
		if ((i % 40) != 0)
4214
			printf("\n");
4215
#endif
4216
	}
4217
4218
	if (with_lock)
4219
		XT_INDEX_UNLOCK(ind, ot);
4220
	return block_count;
4221
4222
}
4223
4224
xtPublic void xt_check_indices(XTOpenTablePtr ot)
4225
{
4226
	register XTTableHPtr	tab = ot->ot_table;
4227
	XTIndexPtr				*ind;
4228
	xtIndexNodeID			current;
4229
	XTIndFreeBlockRec		free_block;
4230
	u_int					ind_count, block_count = 0;
4231
	u_int					free_count = 0;
4232
	u_int					i, j;
4233
4234
	xt_lock_mutex_ns(&tab->tab_ind_flush_lock);
4235
	printf("CHECK INDICES %s ==============================\n", tab->tab_name->ps_path);
4236
#ifdef TRACK_ACTIVITY
4237
	track_reset_missing();
4238
#endif
4239
4240
	ind = tab->tab_dic.dic_keys;
4241
	for (u_int k=0; k<tab->tab_dic.dic_key_count; k++, ind++) {
4242
		ind_count = idx_check_index(ot, *ind, TRUE);
4243
		block_count += ind_count;
4244
	}
4245
4246
#ifdef DO_COMP_TEST
4247
	int block_total;
4248
4249
#ifdef FILL_COMPRESS_BLOCKS
4250
	if (fill_size > 0) {
4251
		unsigned	comp_size;
4252
		unsigned	uncomp_size;
4253
		xtWord8		now;
4254
		xtWord8		time;
4255
4256
		now = xt_trace_clock();
4257
		comp_size = idx_compress(fill_size, precomp, COMPRESS_BLOCK_SIZE, out);
4258
		time = xt_trace_clock() - now;
4259
		zlib_time += time;
4260
		zlib_total += comp_size;
4261
		filled_size += fill_size;
4262
4263
		now = xt_trace_clock();
4264
		uncomp_size = idx_decompress(comp_size, out, COMPRESS_BLOCK_SIZE, uncomp);
4265
		time = xt_trace_clock() - now;
4266
		uncomp_time += time;
4267
	}
4268
	if (filled_size != usage_total)
4269
		printf("What?\n");
4270
#endif
4271
4272
	printf("Total blocks  = %d\n", blocks);
4273
	printf("zlib <=  1024 = %d\n", zlib_1024);
4274
	printf("zlib <=  2048 = %d\n", zlib_2048);
4275
	printf("zlib <=  4096 = %d\n", zlib_4096);
4276
	printf("zlib <=  8192 = %d\n", zlib_8192);
4277
	printf("zlib <= 16384 = %d\n", zlib_16384);
4278
	printf("zlib average size = %.2f\n", (double) zlib_total / (double) blocks);
4279
	printf("zlib average time = %.2f\n", (double) zlib_time / (double) blocks);
4280
	printf("read average time = %.2f\n", (double) read_time / (double) blocks);
4281
	printf("uncompress time   = %.2f\n", (double) uncomp_time / (double) blocks);
4282
	block_total = (zlib_1024 + zlib_2048) * 8192;
4283
	block_total += zlib_4096 * 8192;
4284
	block_total += zlib_8192 * 8192;
4285
	block_total += zlib_16384 * 16384;
4286
	printf("block total       = %d\n", block_total);
4287
	printf("block %% compress  = %.2f\n", ((double) block_total * (double) 100) / ((double) blocks * (double) 16384));
4288
	printf("Total size        = %d\n", blocks * 16384);
4289
	printf("total before zlib = %d\n", usage_total);
4290
	printf("total after zlib  = %d\n", zlib_total);
4291
	printf("zlib %% compress   = %.2f\n", ((double) zlib_total * (double) 100) / (double) usage_total);
4292
	printf("total %% compress  = %.2f\n", ((double) zlib_total * (double) 100) / (double) (blocks * 16384));
4293
#endif
4294
4295
	xt_lock_mutex_ns(&tab->tab_ind_lock);
4296
#ifdef DUMP_INDEX
4297
	printf("\nFREE: ---------------------------------------\n");
4298
#endif
4299
	if (tab->tab_ind_free_list) {
4300
		XTIndFreeListPtr	ptr;
4301
4302
		ptr = tab->tab_ind_free_list;
4303
		while (ptr) {
4304
#ifdef DUMP_INDEX
4305
			printf("Memory List:");
4306
#endif
4307
			i = 0;
4308
			for (j=ptr->fl_start; j<ptr->fl_free_count; j++, i++) {
4309
#ifdef DUMP_INDEX
4310
				if ((i % 40) == 0)
4311
					printf("\n");
4312
#endif
4313
				free_count++;
4314
#ifdef TRACK_ACTIVITY
4315
				track_block_exists(ptr->fl_page_id[j]);
4316
#endif
4317
#ifdef DUMP_INDEX
4318
				printf("%2d ", (int) XT_NODE_ID(ptr->fl_page_id[j]));
4319
#endif
4320
			}
4321
#ifdef DUMP_INDEX
4322
			if ((i % 40) != 0)
4323
				printf("\n");
4324
#endif
4325
			ptr = ptr->fl_next_list;
4326
		}
4327
	}
4328
4329
	current = tab->tab_ind_free;
4330
	if (XT_NODE_ID(current)) {
4331
		u_int k = 0;
4332
#ifdef DUMP_INDEX
4333
		printf("Disk List:");
4334
#endif
4335
		while (XT_NODE_ID(current)) {
4336
#ifdef DUMP_INDEX
4337
			if ((k % 40) == 0)
4338
				printf("\n");
4339
#endif
4340
			free_count++;
4341
#ifdef TRACK_ACTIVITY
4342
			track_block_exists(current);
4343
#endif
4344
#ifdef DUMP_INDEX
4345
			printf("%d ", (int) XT_NODE_ID(current));
4346
#endif
4347
			if (!xt_ind_read_bytes(ot, *ind, current, sizeof(XTIndFreeBlockRec), (xtWord1 *) &free_block)) {
4348
				xt_log_and_clear_exception_ns();
4349
				break;
4350
			}
4351
			XT_NODE_ID(current) = (xtIndexNodeID) XT_GET_DISK_8(free_block.if_next_block_8);
4352
			k++;
4353
		}
4354
#ifdef DUMP_INDEX
4355
		if ((k % 40) != 0)
4356
			printf("\n");
4357
#endif
4358
	}
4359
#ifdef DUMP_INDEX
4360
	printf("\n-----------------------------\n");
4361
	printf("used blocks %d + free blocks %d = %d\n", block_count, free_count, block_count + free_count);
4362
	printf("EOF = %"PRIu64", total blocks = %d\n", (xtWord8) xt_ind_node_to_offset(tab, tab->tab_ind_eof), (int) (XT_NODE_ID(tab->tab_ind_eof) - 1));
4363
	printf("-----------------------------\n");
4364
#endif
4365
	xt_unlock_mutex_ns(&tab->tab_ind_lock);
4366
#ifdef TRACK_ACTIVITY
4367
	track_dump_missing(tab->tab_ind_eof);
4368
	printf("===================================================\n");
4369
	track_dump_all((u_int) (XT_NODE_ID(tab->tab_ind_eof) - 1));
4370
#endif
4371
	printf("===================================================\n");
4372
	xt_unlock_mutex_ns(&tab->tab_ind_flush_lock);
4373
}
4374
4375
/*
4376
 * -----------------------------------------------------------------------
4377
 * Load index
4378
 */
4379
4380
static void idx_load_node(XTThreadPtr self, XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID node)
4381
{
4382
	XTIdxResultRec		result;
4383
	XTIndReferenceRec	iref;
4384
4385
	ASSERT_NS(XT_NODE_ID(node) <= XT_NODE_ID(ot->ot_table->tab_ind_eof));
4386
	if (!xt_ind_fetch(ot, ind, node, XT_LOCK_READ, &iref))
4387
		xt_throw(self);
4388
4389
	idx_first_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
4390
	if (result.sr_item.i_node_ref_size)
4391
		idx_load_node(self, ot, ind, result.sr_branch);
4392
	while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
4393
		idx_next_branch_item(ot->ot_table, ind, iref.ir_branch, &result);
4394
		if (result.sr_item.i_node_ref_size)
4395
			idx_load_node(self, ot, ind, result.sr_branch);
4396
	}
4397
4398
	xt_ind_release(ot, ind, XT_UNLOCK_READ, &iref);
4399
}
4400
4401
xtPublic void xt_load_indices(XTThreadPtr self, XTOpenTablePtr ot)
4402
{
4403
	register XTTableHPtr	tab = ot->ot_table;
4404
	XTIndexPtr				*ind_ptr;
4405
	XTIndexPtr				ind;
4406
	xtIndexNodeID			current;
4407
4408
	xt_lock_mutex(self, &tab->tab_ind_flush_lock);
4409
	pushr_(xt_unlock_mutex, &tab->tab_ind_flush_lock);
4410
4411
	ind_ptr = tab->tab_dic.dic_keys;
4412
	for (u_int k=0; k<tab->tab_dic.dic_key_count; k++, ind_ptr++) {
4413
		ind = *ind_ptr;
4414
		XT_INDEX_WRITE_LOCK(ind, ot);
4415
		if ((XT_NODE_ID(current) = XT_NODE_ID(ind->mi_root)))
4416
			idx_load_node(self, ot, ind, current);
4417
		XT_INDEX_UNLOCK(ind, ot);
4418
	}
4419
4420
	freer_(); // xt_unlock_mutex(&tab->tab_ind_flush_lock)
4421
}
4422
4423
/*
4424
 * -----------------------------------------------------------------------
4425
 * Count the number of deleted entries in a node:
4426
 */
4427
4428
/*
4429
 * {LAZY-DEL-INDEX-ITEMS}
4430
 *
4431
 * Use this function to count the number of deleted items 
4432
 * in a node when it is loaded.
4433
 *
4434
 * The count helps us decide of the node should be "packed".
4435
 */
4436
xtPublic void xt_ind_count_deleted_items(XTTableHPtr tab, XTIndexPtr ind, XTIndBlockPtr block)
4437
{
4438
	XTIdxResultRec		result;
4439
	int					del_count = 0;
4440
	xtWord2				branch_size;
4441
4442
	branch_size = XT_GET_DISK_2(((XTIdxBranchDPtr) block->cb_data)->tb_size_2);
4443
4444
	/* This is possible when reading free pages. */
4445
	if (XT_GET_INDEX_BLOCK_LEN(branch_size) < 2 || XT_GET_INDEX_BLOCK_LEN(branch_size) > XT_INDEX_PAGE_SIZE)
4446
		return;
4447
4448
	idx_first_branch_item(tab, ind, (XTIdxBranchDPtr) block->cb_data, &result);
4449
	while (result.sr_item.i_item_offset < result.sr_item.i_total_size) {
4450
		if (result.sr_row_id == (xtRowID) -1)
4451
			del_count++;
4452
		idx_next_branch_item(tab, ind, (XTIdxBranchDPtr) block->cb_data, &result);
4453
	}
4454
	block->cp_del_count = del_count;
4455
}
4456
4457
/*
4458
 * -----------------------------------------------------------------------
4459
 * Dirty list
4460
 */
4461
4462
xtBool XTIndDirtyList::dl_add_block(XTIndBlockPtr block)
4463
{
4464
	XTIndDirtyBlocksPtr blocks;
4465
4466
	blocks = dl_block_lists;
4467
	if (dl_list_usage == XT_DIRTY_BLOCK_LIST_SIZE || !blocks) {
4468
		if (!(blocks = (XTIndDirtyBlocksPtr) xt_malloc_ns(sizeof(XTIndDirtyBlocksRec))))
4469
			return FAILED;
4470
		dl_list_usage = 0;
4471
		blocks->db_next = dl_block_lists;
4472
		dl_block_lists = blocks;
4473
	}
4474
	blocks->db_blocks[dl_list_usage] = block;
4475
	dl_list_usage++;
4476
	dl_total_blocks++;
4477
	return OK;
4478
}
4479
4480
static int idx_compare_blocks(const void *a, const void *b)
4481
{
4482
	XTIndBlockPtr b_a = *((XTIndBlockPtr *) a);
4483
	XTIndBlockPtr b_b = *((XTIndBlockPtr *) b);
4484
4485
	if (b_a->cb_address == b_b->cb_address)
4486
		return 0;
4487
	if (b_a->cb_address < b_b->cb_address)
4488
		return -1;
4489
	return 1;
4490
}
4491
4492
void XTIndDirtyList::dl_sort_blocks()
4493
{
4494
	XTIndDirtyBlocksPtr blocks;
4495
	size_t				size;
4496
4497
	size = dl_list_usage;
4498
	blocks = dl_block_lists;
4499
	while (blocks) {
4500
		qsort(blocks->db_blocks, size, sizeof(XTIndBlockPtr), idx_compare_blocks);
4501
		blocks = blocks->db_next;
4502
		size = XT_DIRTY_BLOCK_LIST_SIZE;
4503
	}
4504
}
4505
4506
void XTIndDirtyList::dl_free_all()
4507
{
4508
	XTIndDirtyBlocksPtr blocks, n_blocks;
4509
4510
	blocks = dl_block_lists;
4511
	dl_block_lists = NULL;
4512
	dl_total_blocks = 0;
4513
	dl_list_usage = 0;
4514
	while (blocks) {
4515
		n_blocks = blocks->db_next;
4516
		xt_free_ns(blocks);
4517
		blocks = n_blocks;
4518
	}
4519
}
4520
4521
/*
4522
 * -----------------------------------------------------------------------
4523
 * Index consistent flush
4524
 */
4525
4526
xtBool XTFlushIndexTask::tk_task(XTThreadPtr thread)
4527
{
4528
	XTOpenTablePtr		ot;
4529
4530
	fit_dirty_blocks = 0;
4531
	fit_blocks_flushed = 0;
4532
4533
	/* See {TASK-TABLE-GONE} */
4534
	if (!(xt_db_open_pool_table_ns(&ot, fit_table->tab_db, fit_table->tab_id)))
4535
		return FAILED;
4536
4537
	if (!ot) {
4538
		/* Can happen if the table has been dropped: */
4539
		if (thread->t_exception.e_xt_err)
4540
			xt_log_and_clear_exception(thread);
4541
		xt_logf(XT_NT_WARNING, "Checkpoint skipping table (ID) %lu: table was not found\n", (u_long) fit_table->tab_id);
4542
		xt_checkpoint_set_flush_state(fit_table->tab_db, fit_table->tab_id, XT_CPT_STATE_DONE_ALL);
4543
		return OK;
4544
	}
4545
4546
	if (ot->ot_table != fit_table) {
4547
		/* Can happen if the table has been renamed: */
4548
		if (thread->t_exception.e_xt_err)
4549
			xt_log_and_clear_exception(thread);
4550
		xt_logf(XT_NT_WARNING, "Checkpoint skipping table (ID) %lu: table has been renamed\n", (u_long) fit_table->tab_id);
4551
		xt_checkpoint_set_flush_state(fit_table->tab_db, fit_table->tab_id, XT_CPT_STATE_DONE_ALL);
4552
		goto table_gone;
4553
	}
4554
4555
	if (!xt_flush_indices(ot, NULL, FALSE, this)) {
4556
		xt_db_return_table_to_pool_ns(ot);
4557
		return FAILED;
4558
	}
4559
4560
	table_gone:
4561
	xt_db_return_table_to_pool_ns(ot);
4562
	return OK;
4563
}
4564
4565
void XTFlushIndexTask::tk_reference()
4566
{
4567
	xt_heap_reference_ns(fit_table);
4568
}
4569
4570
void XTFlushIndexTask::tk_release()
4571
{
4572
	xt_heap_release_ns(fit_table);
4573
}
4574
4575
/*
4576
 * Set notify_before_write to TRUE if the caller requires
4577
 * notification before the index file is written.
4578
 *
4579
 * This is used if the index is flushed due to lock of index cache.
4580
 */
4581
xtPublic xtBool xt_async_flush_indices(XTTableHPtr tab, xtBool notify_complete, xtBool notify_before_write, XTThreadPtr thread)
4582
{
4583
	/* Run the task: */
4584
	return xt_run_async_task(tab->tab_ind_flush_task, notify_complete, notify_before_write, thread, tab->tab_db);
4585
}
4586
4587
#if defined(PRINT_IND_FLUSH_STATS) || defined(TRACE_FLUSH_TIMES)
4588
4589
static char *idx_format(char *buffer, double v)
4590
{
4591
	if (v != 0.0) {
4592
		sprintf(buffer, "%9.2f", v);
4593
		if (strcmp(buffer, "      nan") == 0)
4594
			strcpy(buffer, "         ");
4595
	}
4596
	else
4597
		strcpy(buffer, "         ");
4598
	return buffer;
4599
}
4600
4601
static char *idx_format_mb(char *buffer, double v)
4602
{
4603
	if (v != 0.0) {
4604
		sprintf(buffer, "%7.3f", v / (double) 1024);
4605
		if (strcmp(buffer, "    nan") == 0)
4606
			strcpy(buffer, "       ");
4607
	}
4608
	else
4609
		strcpy(buffer, "       ");
4610
	return buffer;
4611
}
4612
#endif
4613
4614
#ifdef TRACE_FLUSH_TIMES
4615
4616
#define ILOG_FLUSH	1
4617
#define INDEX_FLUSH	2
4618
4619
struct idxstats {
4620
	u_int i_log_flush;
4621
	u_int i_log_write;
4622
	u_int idx_flush;
4623
	u_int idx_write;
4624
};
4625
4626
static void idx_print(char *msg, XTThreadPtr thread, struct idxstats *st, xtWord8 *now, int flush)
4627
{
4628
	xtWord8	then, t;
4629
	double	ilogw, idxw;
4630
	double	dilogw, didxw;
4631
	char	buf1[30];
4632
	char	buf2[30];
4633
	char	buf3[30];
4634
	char	buf4[30];
4635
4636
	then = xt_trace_clock();
4637
	t = then - *now;
4638
	ilogw = (double) (thread->st_statistics.st_ilog.ts_write - st->i_log_write) / (double) 1024;
4639
	dilogw = ((double) ilogw * (double) 1000000) / (double) t;
4640
	idxw = (double) (thread->st_statistics.st_ind.ts_write - st->idx_write) / (double) 1024;
4641
	didxw = ((double) idxw * (double) 1000000) / (double) t;
4642
4643
	printf("%26s | TIME: %7d ", msg, (int) t);
4644
	printf("ILOG: %s - %s INDX: %s - %s\n", 
4645
		idx_format_mb(buf1, dilogw), idx_format(buf2, ilogw),
4646
		idx_format_mb(buf3, didxw), idx_format(buf4, idxw));
4647
	st->i_log_write = thread->st_statistics.st_ilog.ts_write;
4648
	st->idx_write = thread->st_statistics.st_ind.ts_write;
4649
4650
	switch (flush) {
4651
		case ILOG_FLUSH:
4652
			ilogw = (double) (thread->st_statistics.st_ilog.ts_write - st->i_log_flush) / (double) 1024;
4653
			dilogw = ((double) ilogw * (double) 1000000) / (double) t;
4654
			printf("%26s | TIME: %7s ", " ", " ");
4655
			printf("ILOG: %s - %s INDX: %s - %s\n", 
4656
				idx_format_mb(buf1, dilogw), idx_format(buf2, ilogw),
4657
				idx_format_mb(buf3, 0.0), idx_format(buf4, 0.0));
4658
			st->i_log_flush = thread->st_statistics.st_ilog.ts_write;
4659
			break;
4660
		case INDEX_FLUSH:
4661
			idxw = (double) (thread->st_statistics.st_ind.ts_write - st->idx_flush) / (double) 1024;
4662
			didxw = ((double) idxw * (double) 1000000) / (double) t;
4663
			printf("%26s | TIME: %7s ", " ", " ");
4664
			printf("ILOG: %s - %s INDX: %s - %s\n", 
4665
				idx_format_mb(buf1, 0.0), idx_format(buf2, 0.0),
4666
				idx_format_mb(buf3, didxw), idx_format(buf4, idxw));
4667
			st->idx_flush = thread->st_statistics.st_ind.ts_write;
4668
			break;
4669
	}
4670
4671
	*now = xt_trace_clock();
4672
}
4673
4674
#define TRACE_FLUSH(a, b, c, d, e)		idx_print(a, b, c, d, e)
4675
4676
#else // TRACE_FLUSH_TIMES
4677
4678
#define TRACE_FLUSH(a, b, c, d, e)
4679
4680
#endif // TRACE_FLUSH_TIMES
4681
4682
/* Flush the indexes of a table.
4683
 * If a ft is given, then this means this is an asynchronous flush.
4684
 */
4685
xtPublic xtBool xt_flush_indices(XTOpenTablePtr ot, off_t *bytes_flushed, xtBool have_table_lock, XTFlushIndexTask *fit)
4686
{
4687
	register XTTableHPtr	tab = ot->ot_table;
4688
	XTIndexLogPtr			il;
4689
	XTIndexPtr				*indp;
4690
	XTIndexPtr				ind;
4691
	u_int					i, j;
4692
	XTIndBlockPtr			block, fblock;
4693
	xtWord1					*data;
4694
	xtIndexNodeID			ind_free;
4695
	xtBool					block_on_free_list = FALSE;
4696
	xtIndexNodeID			last_address, next_address;
4697
	XTIndFreeListPtr		list_ptr;
4698
	u_int					dirty_blocks;
4699
	XTIndDirtyListItorRec	it;
4700
	//u_int					dirty_count;
4701
#ifdef TRACE_FLUSH_INDEX
4702
	time_t					tnow = 0;
4703
#endif
4704
4705
#ifdef TRACE_FLUSH_TIMES
4706
	XTThreadPtr thread = ot->ot_thread;
4707
	struct idxstats st;
4708
	xtWord8 now;
4709
	st.i_log_flush = thread->st_statistics.st_ilog.ts_write;
4710
	st.i_log_write = thread->st_statistics.st_ilog.ts_write;
4711
	st.idx_flush = thread->st_statistics.st_ind.ts_write;
4712
	st.idx_write = thread->st_statistics.st_ind.ts_write;
4713
	now = xt_trace_clock();
4714
#endif
4715
4716
#ifdef DEBUG_CHECK_IND_CACHE
4717
	xt_ind_check_cache(NULL);
4718
#endif
4719
	xt_lock_mutex_ns(&tab->tab_ind_flush_lock);
4720
	TRACE_FLUSH("LOCKED flush index lock", thread, &st, &now, 0);
4721
4722
	if (!xt_begin_checkpoint(tab->tab_db, have_table_lock, ot->ot_thread))
4723
		return FAILED;
4724
4725
	ASSERT_NS(!tab->tab_ind_flush_ilog);
4726
	if (!tab->tab_db->db_indlogs.ilp_get_log(&tab->tab_ind_flush_ilog, ot->ot_thread))
4727
		goto failed_3;
4728
	il = tab->tab_ind_flush_ilog;
4729
4730
	if (!il->il_reset(ot))
4731
		goto failed_2;
4732
	if (!il->il_write_byte(ot, XT_DT_LOG_HEAD))
4733
		goto failed_2;
4734
	if (!il->il_write_word4(ot, tab->tab_id))
4735
		goto failed_2;
4736
	if (!il->il_write_word4(ot, 0))
4737
		goto failed_2;
4738
	TRACE_FLUSH("reset ilog", thread, &st, &now, 0);
4739
4740
	/* Lock all: */
4741
	dirty_blocks = 0;
4742
	indp = tab->tab_dic.dic_keys;
4743
	for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4744
		ind = *indp;
4745
		XT_INDEX_WRITE_LOCK(ind, ot);
4746
		if (ind->mi_free_list && ind->mi_free_list->fl_free_count)
4747
			block_on_free_list = TRUE;
4748
		dirty_blocks += ind->mi_dirty_blocks;
4749
	}
4750
	TRACE_FLUSH("LOCKED all indexes", thread, &st, &now, 0);
4751
4752
	if (!dirty_blocks && !block_on_free_list) {
4753
		/* Nothing to flush... */
4754
		indp = tab->tab_dic.dic_keys;
4755
		for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4756
			ind = *indp;
4757
			XT_INDEX_UNLOCK(ind, ot);
4758
		}
4759
		goto flush_done;
4760
	}
4761
4762
#ifdef TRACE_FLUSH_INDEX
4763
	tnow = time(NULL);
4764
	printf("FLUSH INDEX pages=%lu %s\n", (u_long) dirty_blocks, tab->tab_name->ps_path);
4765
#endif
4766
4767
	if (fit)
4768
		fit->fit_dirty_blocks = dirty_blocks;
4769
4770
	// 128 dirty blocks == 2MB
4771
	if (bytes_flushed)
4772
		*bytes_flushed += (dirty_blocks * XT_INDEX_PAGE_SIZE);
4773
4774
	/* Collect the index roots: */
4775
	data = tab->tab_index_head->tp_data;
4776
4777
	/* Collect a complete list of all dirty blocks: */
4778
	indp = tab->tab_dic.dic_keys;
4779
	for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4780
		ind = *indp;
4781
		xt_spinlock_lock(&ind->mi_dirty_lock);
4782
		if ((block = ind->mi_dirty_list)) {
4783
			while (block) {
4784
				ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_DIRTY);
4785
#ifdef IND_OPT_DATA_WRITTEN
4786
				ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
4787
#endif
4788
				tab->tab_ind_dirty_list.dl_add_block(block);
4789
				fblock = block->cb_dirty_next;
4790
				block->cb_dirty_next = NULL;
4791
				block->cb_dirty_prev = NULL;
4792
				block->cb_state = IDX_CAC_BLOCK_FLUSHING;
4793
				block = fblock;
4794
			}
4795
		}
4796
		//dirty_count = ind->mi_dirty_blocks;
4797
		ind->mi_dirty_blocks = 0;
4798
		ind->mi_dirty_list = NULL;
4799
		xt_spinlock_unlock(&ind->mi_dirty_lock);
4800
		//ot->ot_thread->st_statistics.st_ind_cache_dirty -= dirty_count;
4801
		XT_SET_NODE_REF(tab, data, ind->mi_root);
4802
		data += XT_NODE_REF_SIZE;
4803
	}
4804
4805
	TRACE_FLUSH("Collected all blocks", thread, &st, &now, 0);
4806
4807
	xt_lock_mutex_ns(&tab->tab_ind_lock);
4808
	TRACE_FLUSH("LOCKED table index lock", thread, &st, &now, 0);
4809
4810
	/* Write the free list: */
4811
	if (block_on_free_list) {
4812
		union {
4813
			xtWord1				buffer[XT_BLOCK_SIZE_FOR_DIRECT_IO];
4814
			XTIndFreeBlockRec	free_block;
4815
		} x;
4816
		memset(x.buffer, 0, sizeof(XTIndFreeBlockRec));
4817
4818
		/* The old start of the free list: */
4819
		XT_NODE_ID(ind_free) = 0;
4820
		/* This is a list of lists: */
4821
		while ((list_ptr = tab->tab_ind_free_list)) {
4822
			/* If this free list still has unused blocks,
4823
			 * pick the first. That is the front of
4824
			 * the list of free blocks.
4825
			 */
4826
			if (list_ptr->fl_start < list_ptr->fl_free_count) {
4827
				ind_free = list_ptr->fl_page_id[list_ptr->fl_start];
4828
				break;
4829
			}
4830
			/* This list is empty, free it: */
4831
			tab->tab_ind_free_list = list_ptr->fl_next_list;
4832
			xt_free_ns(list_ptr);
4833
		}
4834
		/* If nothing is on any list, then
4835
		 * take the value stored in the index header.
4836
		 * It is the from of the list on disk.
4837
		 */
4838
		if (!XT_NODE_ID(ind_free))
4839
			ind_free = tab->tab_ind_free;
4840
4841
		if (!il->il_write_byte(ot, XT_DT_FREE_LIST))
4842
			goto failed;
4843
		indp = tab->tab_dic.dic_keys;
4844
		XT_NODE_ID(last_address) = 0;
4845
		for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4846
			ind = *indp;
4847
			if (ind->mi_free_list && ind->mi_free_list->fl_free_count) {
4848
				for (j=0; j<ind->mi_free_list->fl_free_count; j++) {
4849
					next_address = ind->mi_free_list->fl_page_id[j];
4850
					/* Write out the IDs of the free blocks. */
4851
					if (!il->il_write_word4(ot, XT_NODE_ID(ind->mi_free_list->fl_page_id[j])))
4852
						goto failed;
4853
					if (XT_NODE_ID(last_address)) {
4854
						/* Update the page in cache, if it is in the cache! */
4855
						XT_SET_DISK_8(x.free_block.if_next_block_8, XT_NODE_ID(next_address));
4856
						if (!xt_ind_write_cache(ot, last_address, 8, x.buffer))
4857
							goto failed;
4858
					}
4859
					last_address = next_address;
4860
				}
4861
			}
4862
		}
4863
		if (!il->il_write_word4(ot, XT_NODE_ID(ind_free)))
4864
			goto failed;
4865
		if (XT_NODE_ID(last_address)) {
4866
			XT_SET_DISK_8(x.free_block.if_next_block_8, XT_NODE_ID(tab->tab_ind_free));
4867
			if (!xt_ind_write_cache(ot, last_address, 8, x.buffer))
4868
				goto failed;
4869
		}
4870
		if (!il->il_write_word4(ot, 0xFFFFFFFF))
4871
			goto failed;
4872
	}
4873
4874
	/*
4875
	 * Add the free list caches to the global free list cache.
4876
	 * Added backwards to match the write order.
4877
	 */
4878
	indp = tab->tab_dic.dic_keys + tab->tab_dic.dic_key_count-1;
4879
	for (i=0; i<tab->tab_dic.dic_key_count; i++, indp--) {
4880
		ind = *indp;
4881
		if (ind->mi_free_list) {
4882
			ind->mi_free_list->fl_next_list = tab->tab_ind_free_list;
4883
			tab->tab_ind_free_list = ind->mi_free_list;
4884
		}
4885
		ind->mi_free_list = NULL;
4886
	}
4887
4888
	/*
4889
	 * The new start of the free list is the first
4890
	 * item on the table free list:
4891
	 */
4892
	XT_NODE_ID(ind_free) = 0;
4893
	while ((list_ptr = tab->tab_ind_free_list)) {
4894
		if (list_ptr->fl_start < list_ptr->fl_free_count) {
4895
			ind_free = list_ptr->fl_page_id[list_ptr->fl_start];
4896
			break;
4897
		}
4898
		tab->tab_ind_free_list = list_ptr->fl_next_list;
4899
		xt_free_ns(list_ptr);
4900
	}
4901
	if (!XT_NODE_ID(ind_free))
4902
		ind_free = tab->tab_ind_free;
4903
	TRACE_FLUSH("did free block stuff", thread, &st, &now, 0);
4904
	xt_unlock_mutex_ns(&tab->tab_ind_lock);
4905
4906
	XT_SET_DISK_6(tab->tab_index_head->tp_ind_eof_6, XT_NODE_ID(tab->tab_ind_eof));
4907
	XT_SET_DISK_6(tab->tab_index_head->tp_ind_free_6, XT_NODE_ID(ind_free));
4908
4909
	TRACE_FLUSH("UN-LOCKED table index lock", thread, &st, &now, 0);
4910
	if (!il->il_write_header(ot, XT_INDEX_HEAD_SIZE, (xtWord1 *) tab->tab_index_head))
4911
		goto failed;
4912
4913
	TRACE_FLUSH("wrote ilog header", thread, &st, &now, 0);
4914
	indp = tab->tab_dic.dic_keys;
4915
	for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
4916
		ind = *indp;
4917
		XT_INDEX_UNLOCK(ind, ot);
4918
	}
4919
4920
	TRACE_FLUSH("UN-LOCKED all indexes", thread, &st, &now, 0);
4921
4922
	/* Write all blocks to the index log: */
4923
	if (tab->tab_ind_dirty_list.dl_total_blocks) {
4924
		tab->tab_ind_dirty_list.dl_sort_blocks();
4925
		it.dli_reset();
4926
		while ((block = tab->tab_ind_dirty_list.dl_next_block(&it))) {
4927
			XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
4928
			if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
4929
				if (!il->il_write_block(ot, block)) {
4930
					XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
4931
					goto failed_2;
4932
				}
4933
			}
4934
			XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
4935
		}
4936
	}
4937
4938
	/* {PAGE-NO-IN-INDEX-FILE}
4939
	 * At this point all blocks have been written to the index file.
4940
	 * It is not safe to release them from the cache.
4941
	 * It was not safe to do this before this point because
4942
	 * read would read the wrong data.
4943
	 *
4944
	 * The exception to this is freed blocks.
4945
	 * These are cached separately in the free block list.
4946
	 */
4947
	TRACE_FLUSH("Wrote all blocks to the ilog", thread, &st, &now, 0);
4948
4949
	if (il->il_data_written()) {
4950
		/* Flush the log before we flush the index.
4951
		 *
4952
		 * The reason is, we must make sure that changes that
4953
		 * will be in the index are already in the transaction
4954
		 * log.
4955
		 *
4956
		 * Only then are we able to undo those changes on
4957
		 * recovery.
4958
		 *
4959
		 * Simple example:
4960
		 * CREATE TABLE t1 (s1 INT PRIMARY KEY);
4961
		 * INSERT INTO t1 VALUES (1);
4962
		 *
4963
		 * BEGIN;
4964
		 * INSERT INTO t1 VALUES (2);
4965
		 *
4966
		 * --- INDEX IS FLUSHED HERE ---
4967
		 *
4968
		 * --- SERVER CRASH HERE ---
4969
		 *
4970
		 *
4971
		 * The INSERT VALUES (2) has been written
4972
		 * to the log, but not flushed.
4973
		 * But the index has been updated.
4974
		 * If the index is flushed it will contain
4975
		 * the entry for record with s1=2.
4976
		 * 
4977
		 * This entry must be removed on recovery.
4978
		 *
4979
		 * To prevent this situation I flush the log
4980
		 * here.
4981
		 */
4982
		if (!XT_IS_TEMP_TABLE(tab->tab_dic.dic_tab_flags)) {
4983
			/* Note, thread->st_database may not be set here:
4984
			 * #0	0x00defba3 in xt_spinlock_set at lock_xt.h:249
4985
			 * #1	0x00defbfd in xt_spinlock_lock at lock_xt.h:299
4986
			 * #2	0x00e65a98 in XTDatabaseLog::xlog_flush_pending at xactlog_xt.cc:737
4987
			 * #3	0x00e6a27f in XTDatabaseLog::xlog_flush at xactlog_xt.cc:727
4988
			 * #4	0x00e6a308 in xt_xlog_flush_log at xactlog_xt.cc:1476
4989
			 * #5	0x00e22fee in xt_flush_indices at index_xt.cc:4599
4990
			 * #6	0x00e49fe0 in xt_close_table at table_xt.cc:2678
4991
			 * #7	0x00df0f10 in xt_db_pool_exit at database_xt.cc:758
4992
			 * #8	0x00df3ca2 in db_finalize at database_xt.cc:342
4993
			 * #9	0x00e17037 in xt_heap_release at heap_xt.cc:110
4994
			 * #10	0x00df07c0 in db_hash_free at database_xt.cc:245
4995
			 * #11	0x00e0b145 in xt_free_hashtable at hashtab_xt.cc:84
4996
			 * #12	0x00df093e in xt_exit_databases at database_xt.cc:303
4997
			 * #13	0x00e0d3cf in pbxt_call_exit at ha_pbxt.cc:946
4998
			 * #14	0x00e0d498 in ha_exit at ha_pbxt.cc:975
4999
			 * #15	0x00e0dce6 in pbxt_end at ha_pbxt.cc:1274
5000
			 * #16	0x00e0dd03 in pbxt_panic at ha_pbxt.cc:1287
5001
			 * #17	0x002321a1 in ha_finalize_handlerton at handler.cc:392
5002
			 * #18	0x002f0567 in plugin_deinitialize at sql_plugin.cc:815
5003
			 * #19	0x002f370e in reap_plugins at sql_plugin.cc:903
5004
			 * #20	0x002f3eac in plugin_shutdown at sql_plugin.cc:1512
5005
			 * #21	0x000f3ba6 in clean_up at mysqld.cc:1238
5006
			 * #22	0x000f4046 in unireg_end at mysqld.cc:1166
5007
			 * #23	0x000fc2c5 in kill_server at mysqld.cc:1108
5008
			 * #24	0x000fd0d5 in kill_server_thread at mysqld.cc:1129
5009
			 */
5010
			if (!tab->tab_db->db_xlog.xlog_flush(ot->ot_thread))
5011
				goto failed_2;
5012
			TRACE_FLUSH("FLUSHED xlog", thread, &st, &now, 0);
5013
		}
5014
5015
		if (!il->il_flush(ot))
5016
			goto failed_2;
5017
		TRACE_FLUSH("FLUSHED ilog", thread, &st, &now, ILOG_FLUSH);
5018
5019
		if (!il->il_apply_log_write(ot))
5020
			goto failed_2;
5021
5022
		TRACE_FLUSH("wrote all blocks to index", thread, &st, &now, 0);
5023
		/*
5024
		 * {NOT-IN-IND-FILE}
5025
		 * 1.0.01 - I have split apply log into flush and write
5026
		 * parts.
5027
		 *
5028
		 * I have to write before waiting threads can continue
5029
		 * because otherwise incorrect data will be read
5030
		 * when the cache block is freed before it is written
5031
		 * here.
5032
		 *
5033
		 * Risk: (see below).
5034
		 */
5035
		it.dli_reset();
5036
		while ((block = tab->tab_ind_dirty_list.dl_next_block(&it))) {
5037
			XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
5038
			if (block->cb_state == IDX_CAC_BLOCK_LOGGED) {
5039
				block->cb_state = IDX_CAC_BLOCK_CLEAN;
5040
				ot->ot_thread->st_statistics.st_ind_cache_dirty--;
5041
			}
5042
			XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
5043
		}
5044
5045
		/* This will early notification for threads waiting for this operation
5046
		 * to get to this point.
5047
		 */
5048
		if (fit) {
5049
			fit->fit_blocks_flushed = fit->fit_dirty_blocks;
5050
			fit->fit_dirty_blocks = 0;
5051
			xt_async_task_notify(fit);
5052
		}
5053
5054
		/*
5055
		 * 1.0.01 - Note: I have moved the flush to here.
5056
		 * It allows the calling thread to continue during the
5057
		 * flush, but:
5058
		 *
5059
		 * The  problem is, if the ilog flush fails then we have
5060
		 * lost the information to re-create a consistent flush again!
5061
		 */
5062
		if (!il->il_apply_log_flush(ot))
5063
			goto failed_2;
5064
		TRACE_FLUSH("FLUSHED index file", thread, &st, &now, INDEX_FLUSH);
5065
	}
5066
5067
	flush_done:
5068
	tab->tab_ind_dirty_list.dl_free_all();
5069
	tab->tab_ind_flush_ilog = NULL;
5070
	il->il_release();
5071
5072
	/* Mark this table as index flushed: */
5073
	xt_checkpoint_set_flush_state(tab->tab_db, tab->tab_id, XT_CPT_STATE_DONE_INDEX);
5074
	TRACE_FLUSH("set index state", thread, &st, &now, 0);
5075
5076
#ifdef TRACE_FLUSH_INDEX
5077
	if (tnow) {
5078
		printf("flush index (%d) %s DONE\n", (int) (time(NULL) - tnow), tab->tab_name->ps_path);
5079
		fflush(stdout);
5080
	}
5081
#endif
5082
5083
	xt_unlock_mutex_ns(&tab->tab_ind_flush_lock);
5084
	TRACE_FLUSH("UN-LOCKED flush index lock", thread, &st, &now, 0);
5085
5086
	if (!xt_end_checkpoint(tab->tab_db, ot->ot_thread, NULL))
5087
		return FAILED;
5088
5089
#ifdef DEBUG_CHECK_IND_CACHE
5090
	xt_ind_check_cache((XTIndex *) 1);
5091
#endif
5092
	return OK;
5093
5094
	failed:
5095
	indp = tab->tab_dic.dic_keys;
5096
	for (i=0; i<tab->tab_dic.dic_key_count; i++, indp++) {
5097
		ind = *indp;
5098
		XT_INDEX_UNLOCK(ind, ot);
5099
	}
5100
5101
	failed_2:
5102
	tab->tab_ind_dirty_list.dl_free_all();
5103
	tab->tab_ind_flush_ilog = NULL;
5104
	il->il_release();
5105
5106
	failed_3:
5107
	xt_checkpoint_set_flush_state(tab->tab_db, tab->tab_id, XT_CPT_STATE_STOP_INDEX);
5108
5109
#ifdef TRACE_FLUSH_INDEX
5110
	if (tnow) {
5111
		printf("flush index (%d) %s FAILED\n", (int) (time(NULL) - tnow), tab->tab_name->ps_path);
5112
		fflush(stdout);
5113
	}
5114
#endif
5115
5116
	xt_unlock_mutex_ns(&tab->tab_ind_flush_lock);
5117
#ifdef DEBUG_CHECK_IND_CACHE
5118
	xt_ind_check_cache(NULL);
5119
#endif
5120
	return FAILED;
5121
}
5122
5123
void XTIndexLogPool::ilp_init(struct XTThread *self, struct XTDatabase *db, size_t log_buffer_size)
5124
{
5125
	char			path[PATH_MAX];
5126
	XTOpenDirPtr	od;
5127
	xtLogID			log_id;
5128
	char			*file;
5129
	XTIndexLogPtr	il = NULL;
5130
	XTOpenTablePtr	ot = NULL;
5131
5132
	ilp_db = db;
5133
	ilp_log_buffer_size = log_buffer_size;
5134
	xt_init_mutex_with_autoname(self, &ilp_lock);
5135
5136
	xt_strcpy(PATH_MAX, path, db->db_main_path);
5137
	xt_add_system_dir(PATH_MAX, path);
5138
	if (xt_fs_exists(path)) {
5139
		pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
5140
		while (xt_dir_next(self, od)) {
5141
			file = xt_dir_name(self, od);
5142
			if (xt_starts_with(file, "ilog")) {
5143
				if ((log_id = (xtLogID) xt_file_name_to_id(file))) {
5144
					if (!ilp_open_log(&il, log_id, FALSE, self))
5145
						goto failed;
5146
					if (il->il_tab_id && il->il_log_eof) {
5147
						if (!il->il_open_table(&ot))
5148
							goto failed;
5149
						if (ot) {
5150
							if (!il->il_apply_log_write(ot))
5151
								goto failed;
5152
							if (!il->il_apply_log_flush(ot))
5153
								goto failed;
5154
							ot->ot_thread = self;
5155
							il->il_close_table(ot);
5156
						}
5157
					}
5158
					il->il_close(TRUE);
5159
				}
5160
			}
5161
		}
5162
		freer_(); // xt_dir_close(od)
5163
	}
5164
	return;
5165
5166
	failed:
5167
	/* TODO: Mark index as corrupted: */
5168
	if (ot && il)
5169
		il->il_close_table(ot);
5170
	if (il)
5171
		il->il_close(FALSE);
5172
	xt_throw(self);
5173
}
5174
5175
void XTIndexLogPool::ilp_close(struct XTThread *XT_UNUSED(self), xtBool lock)
5176
{
5177
	XTIndexLogPtr	il;
5178
5179
	if (lock)
5180
		xt_lock_mutex_ns(&ilp_lock);
5181
	while ((il = ilp_log_pool)) {
5182
		ilp_log_pool = il->il_next_in_pool;
5183
		il_pool_count--;
5184
		il->il_close(TRUE);
5185
	}
5186
	if (lock)
5187
		xt_unlock_mutex_ns(&ilp_lock);
5188
}
5189
5190
void XTIndexLogPool::ilp_exit(struct XTThread *self)
5191
{
5192
	ilp_close(self, FALSE);
5193
	ASSERT_NS(il_pool_count == 0);
5194
	xt_free_mutex(&ilp_lock);
5195
}
5196
5197
void XTIndexLogPool::ilp_name(size_t size, char *path, xtLogID log_id)
5198
{
5199
	char name[50];
5200
5201
	sprintf(name, "ilog-%lu.xt", (u_long) log_id);
5202
	xt_strcpy(size, path, ilp_db->db_main_path);
5203
	xt_add_system_dir(size, path);
5204
	xt_add_dir_char(size, path);
5205
	xt_strcat(size, path, name);
5206
}
5207
5208
xtBool XTIndexLogPool::ilp_open_log(XTIndexLogPtr *ret_il, xtLogID log_id, xtBool excl, XTThreadPtr thread)
5209
{
5210
	char				log_path[PATH_MAX];
5211
	XTIndexLogPtr		il;
5212
	XTIndLogHeadDRec	log_head;
5213
	size_t				read_size;
5214
5215
	ilp_name(PATH_MAX, log_path, log_id);
5216
	if (!(il = (XTIndexLogPtr) xt_calloc_ns(sizeof(XTIndexLogRec))))
5217
		return FAILED;
5218
	xt_spinlock_init_with_autoname(NULL, &il->il_write_lock);
5219
	il->il_log_id = log_id;
5220
	il->il_pool = this;
5221
5222
	/* Writes will be rounded up to the nearest direct write block size (see {WRITE-IN-BLOCKS}),
5223
	 * so make sure we have space in the buffer for that:
5224
	 */
5225
#ifdef DEBUG
5226
	if (IND_WRITE_BLOCK_SIZE < XT_BLOCK_SIZE_FOR_DIRECT_IO)
5227
		ASSERT_NS(FALSE);
5228
#ifdef IND_WRITE_IN_BLOCK_SIZES
5229
	if (XT_INDEX_PAGE_SIZE < IND_WRITE_BLOCK_SIZE)
5230
		ASSERT_NS(FALSE);
5231
#endif
5232
#endif
5233
#ifdef IND_FILL_BLOCK_TO_NEXT
5234
	if (!(il->il_buffer = (xtWord1 *) xt_malloc_ns(ilp_log_buffer_size + XT_INDEX_PAGE_SIZE)))
5235
		goto failed;
5236
#else
5237
	if (!(il->il_buffer = (xtWord1 *) xt_malloc_ns(ilp_log_buffer_size + IND_WRITE_BLOCK_SIZE)))
5238
		goto failed;
5239
#endif
5240
	il->il_buffer_size = ilp_log_buffer_size;
5241
5242
	if (!(il->il_of = xt_open_file_ns(log_path, XT_FT_STANDARD, (excl ? XT_FS_EXCLUSIVE : 0) | XT_FS_CREATE | XT_FS_MAKE_PATH, 0)))
5243
		goto failed;
5244
5245
	if (!xt_pread_file(il->il_of, 0, sizeof(XTIndLogHeadDRec), 0, &log_head, &read_size, &thread->st_statistics.st_ilog, thread))
5246
		goto failed;
5247
5248
	if (read_size == sizeof(XTIndLogHeadDRec)) {
5249
		il->il_tab_id = XT_GET_DISK_4(log_head.ilh_tab_id_4);
5250
		il->il_log_eof = XT_GET_DISK_4(log_head.ilh_log_eof_4);
5251
	}
5252
	else {
5253
		il->il_tab_id = 0;
5254
		il->il_log_eof = 0;
5255
	}
5256
5257
	*ret_il = il;
5258
	return OK;
5259
5260
	failed:
5261
	il->il_close(FALSE);
5262
	return FAILED;
5263
}
5264
5265
xtBool XTIndexLogPool::ilp_get_log(XTIndexLogPtr *ret_il, XTThreadPtr thread)
5266
{
5267
	XTIndexLogPtr	il;
5268
	xtLogID			log_id = 0;
5269
5270
	xt_lock_mutex_ns(&ilp_lock);
5271
	if ((il = ilp_log_pool)) {
5272
		ilp_log_pool = il->il_next_in_pool;
5273
		il_pool_count--;
5274
	}
5275
	else {
5276
		ilp_next_log_id++;
5277
		log_id = ilp_next_log_id;
5278
	}
5279
	xt_unlock_mutex_ns(&ilp_lock);
5280
	if (!il) {
5281
		if (!ilp_open_log(&il, log_id, TRUE, thread))
5282
			return FAILED;
5283
	}
5284
	*ret_il= il;
5285
	return OK;
5286
}
5287
5288
void XTIndexLogPool::ilp_release_log(XTIndexLogPtr il)
5289
{
5290
	xt_lock_mutex_ns(&ilp_lock);
5291
	if (il_pool_count == 5)
5292
		il->il_close(TRUE);
5293
	else {
5294
		il_pool_count++;
5295
		il->il_next_in_pool = ilp_log_pool;
5296
		ilp_log_pool = il;
5297
	}
5298
	xt_unlock_mutex_ns(&ilp_lock);
5299
}
5300
5301
xtBool XTIndexLog::il_reset(struct XTOpenTable *ot)
5302
{
5303
	XTIndLogHeadDRec	log_head;
5304
	xtTableID			tab_id = ot->ot_table->tab_id;
5305
5306
	il_tab_id = tab_id;
5307
	il_log_eof = 0;
5308
	il_buffer_len = 0;
5309
	il_buffer_offset = 0;
5310
5311
	/* We must write the header and flush here or the "previous" status (from the
5312
	 * last flush run) could remain. Failure to write the file completely leave the
5313
	 * old header in place, and other parts of the file changed.
5314
	 * This would lead to index corruption.  
5315
	 */
5316
	log_head.ilh_data_type = XT_DT_LOG_HEAD;
5317
	XT_SET_DISK_4(log_head.ilh_tab_id_4, tab_id);
5318
	XT_SET_DISK_4(log_head.ilh_log_eof_4, 0);
5319
5320
	if (!xt_pwrite_file(il_of, 0, sizeof(XTIndLogHeadDRec), (xtWord1 *) &log_head, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5321
		return FAILED;
5322
5323
	if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5324
		return FAILED;
5325
5326
	return OK;
5327
}
5328
5329
xtBool XTIndexLog::il_data_written()
5330
{
5331
	return il_buffer_offset != 0 || il_buffer_len != 0;
5332
}
5333
5334
void XTIndexLog::il_close(xtBool delete_it)
5335
{
5336
	xtLogID	log_id = il_log_id;
5337
5338
	if (il_of) {
5339
		xt_close_file_ns(il_of);
5340
		il_of = NULL;
5341
	}
5342
	
5343
	if (delete_it && log_id) {
5344
		char	log_path[PATH_MAX];
5345
5346
		il_pool->ilp_name(PATH_MAX, log_path, log_id);
5347
		xt_fs_delete(NULL, log_path);
5348
	}
5349
5350
	if (il_buffer) {
5351
		xt_free_ns(il_buffer);
5352
		il_buffer = NULL;
5353
	}
5354
5355
	xt_spinlock_free(NULL, &il_write_lock);
5356
	xt_free_ns(this);
5357
}
5358
5359
void XTIndexLog::il_release()
5360
{
5361
	il_pool->ilp_db->db_indlogs.ilp_release_log(this);
5362
}
5363
5364
xtBool XTIndexLog::il_require_space(size_t bytes, XTThreadPtr thread)
5365
{
5366
	if (il_buffer_len + bytes > il_buffer_size) {
5367
		if (!xt_pwrite_file(il_of, il_buffer_offset, il_buffer_len, il_buffer, &thread->st_statistics.st_ilog, thread))
5368
			return FAILED;
5369
		il_buffer_offset += il_buffer_len;
5370
		il_buffer_len = 0;
5371
	}
5372
5373
	return OK;
5374
}
5375
5376
xtBool XTIndexLog::il_write_byte(struct XTOpenTable *ot, xtWord1 byte)
5377
{
5378
	if (!il_require_space(1, ot->ot_thread))
5379
		return FAILED;
5380
	*(il_buffer + il_buffer_len) = byte;
5381
	il_buffer_len++;
5382
	return OK;
5383
}
5384
5385
xtBool XTIndexLog::il_write_word4(struct XTOpenTable *ot, xtWord4 value)
5386
{
5387
	xtWord1 *buffer;
5388
5389
	if (!il_require_space(4, ot->ot_thread))
5390
		return FAILED;
5391
	buffer = il_buffer + il_buffer_len;
5392
	XT_SET_DISK_4(buffer, value);
5393
	il_buffer_len += 4;
5394
	return OK;
5395
}
5396
5397
/*
5398
 * This function assumes that the block is xlocked!
5399
 */
5400
xtBool XTIndexLog::il_write_block(struct XTOpenTable *ot, XTIndBlockPtr block)
5401
{
5402
	xtIndexNodeID		node_id;
5403
	XTIdxBranchDPtr		node;
5404
	u_int				block_len;
5405
5406
	node_id = block->cb_address;
5407
	node = (XTIdxBranchDPtr) block->cb_data;
5408
	block_len = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(node->tb_size_2));
5409
	
5410
	//il_min_byte_count += (block->cb_max_pos - block->cb_min_pos) + (block->cb_header ? XT_INDEX_PAGE_HEAD_SIZE : 0);
5411
#ifdef IND_WRITE_MIN_DATA
5412
	u_int				max_pos;
5413
	u_int				min_pos;
5414
	xtBool				eo_block = FALSE;
5415
5416
#ifdef IND_WRITE_IN_BLOCK_SIZES
5417
	/* Round up to block boundary: */
5418
	max_pos = ((block->cb_max_pos + XT_INDEX_PAGE_HEAD_SIZE + IND_WRITE_BLOCK_SIZE - 1) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5419
	if (max_pos > block_len)
5420
		max_pos = block_len;
5421
	max_pos -= XT_INDEX_PAGE_HEAD_SIZE;
5422
	/* Round down to block boundary: */
5423
	min_pos = ((block->cb_min_pos + XT_INDEX_PAGE_HEAD_SIZE) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5424
	if (min_pos > 0)
5425
		min_pos -= XT_INDEX_PAGE_HEAD_SIZE;
5426
#else
5427
	max_pos = block->cb_max_pos;
5428
	min_pos = block->cb_min_pos;
5429
#endif
5430
5431
	if (block_len == max_pos + XT_INDEX_PAGE_HEAD_SIZE)
5432
		eo_block = TRUE;
5433
		
5434
	ASSERT_NS(max_pos <= XT_INDEX_PAGE_SIZE-XT_INDEX_PAGE_HEAD_SIZE);
5435
	ASSERT_NS(min_pos <= block_len-XT_INDEX_PAGE_HEAD_SIZE);
5436
	ASSERT_NS(max_pos <= block_len-XT_INDEX_PAGE_HEAD_SIZE);
5437
	ASSERT_NS(min_pos <= max_pos);
5438
5439
	xt_spinlock_lock(&il_write_lock);
5440
	if (block->cb_min_pos == block->cb_max_pos) {
5441
		/* This means just the header was changed. */
5442
#ifdef IND_WRITE_IN_BLOCK_SIZES
5443
		XTIndShortPageDataDPtr		sh_data;
5444
5445
		if (!il_require_space(offsetof(XTIndShortPageDataDRec, ild_data) + IND_WRITE_BLOCK_SIZE, ot->ot_thread))
5446
			goto failed;
5447
5448
		sh_data = (XTIndShortPageDataDPtr) (il_buffer + il_buffer_len);
5449
		sh_data->ild_data_type = XT_DT_SHORT_IND_PAGE;
5450
		XT_SET_DISK_4(sh_data->ild_page_id_4, XT_NODE_ID(node_id));
5451
		XT_SET_DISK_2(sh_data->ild_size_2, IND_WRITE_BLOCK_SIZE);
5452
		memcpy(sh_data->ild_data, block->cb_data, IND_WRITE_BLOCK_SIZE);
5453
		il_buffer_len += offsetof(XTIndShortPageDataDRec, ild_data) + IND_WRITE_BLOCK_SIZE;
5454
#else
5455
		XTIndSetPageHeadDataDPtr	sph_data;
5456
5457
		if (!il_require_space(sizeof(XTIndSetPageHeadDataDRec), ot->ot_thread))
5458
			goto failed;
5459
5460
		ASSERT_NS(sizeof(XTIndSetPageHeadDataDRec) <= il_buffer_size);
5461
5462
		sph_data = (XTIndSetPageHeadDataDPtr) (il_buffer + il_buffer_len);
5463
		sph_data->ild_data_type = XT_DT_SET_PAGE_HEAD;
5464
		XT_SET_DISK_4(sph_data->ild_page_id_4, XT_NODE_ID(node_id));
5465
		XT_COPY_DISK_2(sph_data->ild_page_head_2, block->cb_data);
5466
		il_buffer_len += sizeof(XTIndSetPageHeadDataDRec);
5467
#endif
5468
	}
5469
#ifdef IND_WRITE_IN_BLOCK_SIZES
5470
	else if (min_pos == 0 || (block->cb_header && min_pos == IND_WRITE_BLOCK_SIZE - XT_INDEX_PAGE_HEAD_SIZE))
5471
#else
5472
	else if (min_pos < 16 - XT_INDEX_PAGE_HEAD_SIZE)
5473
#endif
5474
	{
5475
		/* Fuse, and write the whole block: */
5476
		if (eo_block) {
5477
			XTIndPageDataDPtr	p_data;
5478
5479
			if (!il_require_space(offsetof(XTIndPageDataDRec, ild_data) + block_len, ot->ot_thread))
5480
				goto failed;
5481
5482
			ASSERT_NS(offsetof(XTIndPageDataDRec, ild_data) + block_len <= il_buffer_size);
5483
5484
			p_data = (XTIndPageDataDPtr) (il_buffer + il_buffer_len);
5485
			p_data->ild_data_type = XT_DT_INDEX_PAGE;
5486
			XT_SET_DISK_4(p_data->ild_page_id_4, XT_NODE_ID(node_id));
5487
			memcpy(p_data->ild_data, block->cb_data, block_len);
5488
			il_buffer_len += offsetof(XTIndPageDataDRec, ild_data) + block_len;
5489
		}
5490
		else {
5491
			XTIndShortPageDataDPtr	sp_data;
5492
5493
			block_len = max_pos + XT_INDEX_PAGE_HEAD_SIZE;
5494
5495
			if (!il_require_space(offsetof(XTIndShortPageDataDRec, ild_data) + block_len, ot->ot_thread))
5496
				goto failed;
5497
5498
			ASSERT_NS(offsetof(XTIndShortPageDataDRec, ild_data) + block_len <= il_buffer_size);
5499
5500
			sp_data = (XTIndShortPageDataDPtr) (il_buffer + il_buffer_len);
5501
			sp_data->ild_data_type = XT_DT_SHORT_IND_PAGE;
5502
			XT_SET_DISK_4(sp_data->ild_page_id_4, XT_NODE_ID(node_id));
5503
			XT_SET_DISK_2(sp_data->ild_size_2, block_len);
5504
			memcpy(sp_data->ild_data, block->cb_data, block_len);
5505
			il_buffer_len += offsetof(XTIndShortPageDataDRec, ild_data) + block_len;
5506
		}
5507
	}
5508
	else {
5509
		block_len = max_pos - min_pos;
5510
5511
		if (block->cb_header) {
5512
#ifdef IND_WRITE_IN_BLOCK_SIZES
5513
			XTIndDoubleModPageDataDPtr	dd_data;
5514
5515
			ASSERT_NS(min_pos + XT_INDEX_PAGE_HEAD_SIZE >= 2*IND_WRITE_BLOCK_SIZE);
5516
			ASSERT_NS((min_pos + XT_INDEX_PAGE_HEAD_SIZE) % IND_WRITE_BLOCK_SIZE == 0);
5517
			if (!il_require_space(offsetof(XTIndDoubleModPageDataDRec, dld_data) + IND_WRITE_BLOCK_SIZE + block_len, ot->ot_thread))
5518
				goto failed;
5519
5520
			dd_data = (XTIndDoubleModPageDataDPtr) (il_buffer + il_buffer_len);
5521
			dd_data->dld_data_type = eo_block ? XT_DT_2_MOD_IND_PAGE_EOB : XT_DT_2_MOD_IND_PAGE;
5522
			XT_SET_DISK_4(dd_data->dld_page_id_4, XT_NODE_ID(node_id));
5523
			XT_SET_DISK_2(dd_data->dld_size1_2, IND_WRITE_BLOCK_SIZE);
5524
			XT_SET_DISK_2(dd_data->dld_offset2_2, min_pos);
5525
			XT_SET_DISK_2(dd_data->dld_size2_2, block_len);
5526
			memcpy(dd_data->dld_data, block->cb_data, IND_WRITE_BLOCK_SIZE);
5527
			memcpy(dd_data->dld_data + IND_WRITE_BLOCK_SIZE, block->cb_data + XT_INDEX_PAGE_HEAD_SIZE + min_pos, block_len);
5528
			il_buffer_len += offsetof(XTIndDoubleModPageDataDRec, dld_data) + IND_WRITE_BLOCK_SIZE + block_len;
5529
#else
5530
			XTIndModPageHeadDataDPtr	mph_data;
5531
5532
			if (!il_require_space(offsetof(XTIndModPageHeadDataDRec, ild_data) + block_len, ot->ot_thread))
5533
				goto failed;
5534
5535
			mph_data = (XTIndModPageHeadDataDPtr) (il_buffer + il_buffer_len);
5536
			mph_data->ild_data_type = eo_block ? XT_DT_MOD_IND_PAGE_HEAD_EOB : XT_DT_MOD_IND_PAGE_HEAD;
5537
			XT_SET_DISK_4(mph_data->ild_page_id_4, XT_NODE_ID(node_id));
5538
			XT_SET_DISK_2(mph_data->ild_size_2, block_len);
5539
			XT_SET_DISK_2(mph_data->ild_offset_2, min_pos);
5540
			XT_COPY_DISK_2(mph_data->ild_page_head_2, block->cb_data);
5541
			memcpy(mph_data->ild_data, block->cb_data + XT_INDEX_PAGE_HEAD_SIZE + min_pos, block_len);
5542
			il_buffer_len += offsetof(XTIndModPageHeadDataDRec, ild_data) + block_len;
5543
#endif
5544
		}
5545
		else {
5546
			XTIndModPageDataDPtr	mp_data;
5547
5548
			if (!il_require_space(offsetof(XTIndModPageDataDRec, ild_data) + block_len, ot->ot_thread))
5549
				goto failed;
5550
5551
			mp_data = (XTIndModPageDataDPtr) (il_buffer + il_buffer_len);
5552
			mp_data->ild_data_type = eo_block ? XT_DT_MOD_IND_PAGE_EOB : XT_DT_MOD_IND_PAGE;
5553
			XT_SET_DISK_4(mp_data->ild_page_id_4, XT_NODE_ID(node_id));
5554
			XT_SET_DISK_2(mp_data->ild_size_2, block_len);
5555
			XT_SET_DISK_2(mp_data->ild_offset_2, min_pos);
5556
			memcpy(mp_data->ild_data, block->cb_data + XT_INDEX_PAGE_HEAD_SIZE + min_pos, block_len);
5557
			il_buffer_len += offsetof(XTIndModPageDataDRec, ild_data) + block_len;
5558
		}
5559
	}
5560
5561
	block->cb_header = FALSE;
5562
	block->cb_min_pos = 0xFFFF;
5563
	block->cb_max_pos = 0;
5564
5565
#else // IND_OPT_DATA_WRITTEN
5566
	XTIndPageDataDPtr	page_data;
5567
5568
	if (!il_require_space(offsetof(XTIndPageDataDRec, ild_data) + block_len, ot->ot_thread))
5569
		goto failed;
5570
5571
	ASSERT_NS(offsetof(XTIndPageDataDRec, ild_data) + XT_INDEX_PAGE_SIZE <= il_buffer_size);
5572
5573
	page_data = (XTIndPageDataDPtr) (il_buffer + il_buffer_len);
5574
	TRACK_BLOCK_TO_FLUSH(node_id);
5575
	page_data->ild_data_type = XT_DT_INDEX_PAGE;
5576
	XT_SET_DISK_4(page_data->ild_page_id_4, XT_NODE_ID(node_id));
5577
	memcpy(page_data->ild_data, block->cb_data, block_len);
5578
5579
	il_buffer_len += offsetof(XTIndPageDataDRec, ild_data) + block_len;
5580
5581
#endif // IND_OPT_DATA_WRITTEN
5582
	xt_spinlock_unlock(&il_write_lock);
5583
5584
	ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_FLUSHING);
5585
	block->cb_state = IDX_CAC_BLOCK_LOGGED;
5586
5587
	TRACK_BLOCK_TO_FLUSH(node_id);
5588
	return OK;
5589
5590
	failed:
5591
	xt_spinlock_unlock(&il_write_lock);
5592
	return FAILED;
5593
}
5594
5595
xtBool XTIndexLog::il_write_header(struct XTOpenTable *ot, size_t head_size, xtWord1 *head_buf)
5596
{
5597
	XTIndHeadDataDPtr	head_data;
5598
5599
	if (!il_require_space(offsetof(XTIndHeadDataDRec, ilh_data) + head_size, ot->ot_thread))
5600
		return FAILED;
5601
5602
	head_data = (XTIndHeadDataDPtr) (il_buffer + il_buffer_len);
5603
	head_data->ilh_data_type = XT_DT_HEADER;
5604
	XT_SET_DISK_2(head_data->ilh_head_size_2, head_size);
5605
	memcpy(head_data->ilh_data, head_buf, head_size);
5606
5607
	il_buffer_len += offsetof(XTIndHeadDataDRec, ilh_data) + head_size;
5608
5609
	return OK;
5610
}
5611
5612
xtBool XTIndexLog::il_flush(struct XTOpenTable *ot)
5613
{
5614
	XTIndLogHeadDRec	log_head;
5615
	xtTableID			tab_id = ot->ot_table->tab_id;
5616
5617
	if (il_buffer_len) {
5618
		if (!xt_pwrite_file(il_of, il_buffer_offset, il_buffer_len, il_buffer, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5619
			return FAILED;
5620
		il_buffer_offset += il_buffer_len;
5621
		il_buffer_len = 0;
5622
	}
5623
5624
	if (il_log_eof != il_buffer_offset) {
5625
		log_head.ilh_data_type = XT_DT_LOG_HEAD;
5626
		XT_SET_DISK_4(log_head.ilh_tab_id_4, tab_id);
5627
		XT_SET_DISK_4(log_head.ilh_log_eof_4, il_buffer_offset);
5628
5629
		if (!XT_IS_TEMP_TABLE(ot->ot_table->tab_dic.dic_tab_flags)) {
5630
			if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5631
				return FAILED;
5632
		}
5633
5634
		if (!xt_pwrite_file(il_of, 0, sizeof(XTIndLogHeadDRec), (xtWord1 *) &log_head, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5635
			return FAILED;
5636
5637
		if (!XT_IS_TEMP_TABLE(ot->ot_table->tab_dic.dic_tab_flags)) {
5638
			if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5639
				return FAILED;
5640
		}
5641
5642
		il_tab_id = tab_id;
5643
		il_log_eof = il_buffer_offset;
5644
	}
5645
	return OK;
5646
}
5647
5648
#ifdef CHECK_IF_WRITE_WAS_OK
5649
static void check_buff(void *in_a, void *in_b, int len)
5650
{
5651
	xtWord1 *a = (xtWord1 *) in_a;
5652
	xtWord1 *b = (xtWord1 *) in_b;
5653
	int offset = 0;
5654
5655
	while (offset < len) {
5656
		if (*a != *b) {
5657
			printf("Missmatch at offset = %d %x != %x\n", offset, (int) *a, (int) *b);
5658
			//xt_dump_trace();
5659
			ASSERT_NS(FALSE);
5660
		}
5661
		offset++;
5662
		a++;
5663
		b++;
5664
	}
5665
}
5666
#endif
5667
5668
xtBool XTIndexLog::il_apply_log_write(struct XTOpenTable *ot)
5669
{
5670
	XT_NODE_TEMP;
5671
	register XTTableHPtr	tab = ot->ot_table;
5672
	off_t					offset;
5673
	size_t					pos;
5674
	xtWord1					*buffer;
5675
	off_t					address;
5676
	xtIndexNodeID			node_id;
5677
	size_t					req_size = 0;
5678
	XTIdxBranchDPtr			node;
5679
	u_int					block_len;
5680
	u_int					block_offset;
5681
	xtWord1					*block_header;
5682
	xtWord1					*block_data;
5683
#ifdef CHECK_IF_WRITE_WAS_OK
5684
	XTIndReferenceRec		c_iref;
5685
	XTIdxBranchDPtr			c_node;
5686
	u_int					c_block_len;
5687
#endif
5688
5689
	offset = 0;
5690
	while (offset < il_log_eof) {
5691
		if (offset < il_buffer_offset ||
5692
			offset >= il_buffer_offset + (off_t) il_buffer_len) {
5693
			il_buffer_len = il_buffer_size;
5694
			if (il_log_eof - offset < (off_t) il_buffer_len)
5695
				il_buffer_len = (size_t) (il_log_eof - offset);
5696
5697
			/* Corrupt log?! */
5698
			if (il_buffer_len < req_size) {
5699
				xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_INDEX_LOG_CORRUPT, xt_file_path(il_of));
5700
				xt_log_and_clear_exception_ns();
5701
				return OK;
5702
			}
5703
			if (!xt_pread_file(il_of, offset, il_buffer_len, il_buffer_len, il_buffer, NULL, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5704
				return FAILED;
5705
			il_buffer_offset = offset;
5706
		}
5707
		pos = (size_t) (offset - il_buffer_offset);
5708
		ASSERT_NS(pos < il_buffer_len);
5709
#ifdef CHECK_IF_WRITE_WAS_OK
5710
		node_id = 0;
5711
#endif
5712
		buffer = il_buffer + pos;
5713
		switch (*buffer) {
5714
			case XT_DT_LOG_HEAD:
5715
				req_size = sizeof(XTIndLogHeadDRec);
5716
				if (il_buffer_len - pos < req_size) {
5717
					il_buffer_len = 0;
5718
					continue;
5719
				}
5720
				offset += req_size;
5721
				req_size = 0;
5722
				break;
5723
			case XT_DT_SHORT_IND_PAGE: {
5724
				XTIndShortPageDataDPtr		sp_data;
5725
5726
				req_size = offsetof(XTIndShortPageDataDRec, ild_data);
5727
				if (il_buffer_len - pos < req_size) {
5728
					il_buffer_len = 0;
5729
					continue;
5730
				}
5731
5732
				sp_data = (XTIndShortPageDataDPtr) buffer;
5733
				node_id = XT_RET_NODE_ID(XT_GET_DISK_4(sp_data->ild_page_id_4));
5734
				block_len = XT_GET_DISK_2(sp_data->ild_size_2);
5735
				block_data = sp_data->ild_data;
5736
				goto do_ind_page;
5737
			}
5738
			case XT_DT_INDEX_PAGE:
5739
				XTIndPageDataDPtr	p_data;
5740
5741
				req_size = offsetof(XTIndPageDataDRec, ild_data);
5742
				if (il_buffer_len - pos < req_size + 2) {
5743
					il_buffer_len = 0;
5744
					continue;
5745
				}
5746
5747
				p_data = (XTIndPageDataDPtr) buffer;
5748
				node_id = XT_RET_NODE_ID(XT_GET_DISK_4(p_data->ild_page_id_4));
5749
				node = (XTIdxBranchDPtr) p_data->ild_data;
5750
				block_len = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(node->tb_size_2));
5751
				block_data = p_data->ild_data;
5752
5753
				do_ind_page:
5754
				if (block_len < 2 || block_len > XT_INDEX_PAGE_SIZE) {
5755
					xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, tab->tab_name);
5756
					return FAILED;
5757
				}
5758
5759
				req_size += block_len;
5760
#ifdef IND_FILL_BLOCK_TO_NEXT
5761
				/* Make sure we have the start of the next block in the buffer: 
5762
				 * Should always work because a XT_DT_INDEX_PAGE is never the last
5763
				 * block.
5764
				 */
5765
				if (il_buffer_len - pos < req_size + offsetof(XTIndPageDataDRec, ild_data))
5766
#else
5767
				if (il_buffer_len - pos < req_size)
5768
#endif
5769
				{
5770
					il_buffer_len = 0;
5771
					continue;
5772
				}
5773
5774
				TRACK_BLOCK_FLUSH_N(node_id);
5775
				address = xt_ind_node_to_offset(tab, node_id);
5776
#ifdef IND_WRITE_IN_BLOCK_SIZES
5777
				/* {WRITE-IN-BLOCKS} Round up the block size. Space has been provided. */
5778
				block_len = ((block_len + IND_WRITE_BLOCK_SIZE - 1) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5779
#endif
5780
				IDX_TRACE("%d- W%x\n", (int) XT_NODE_ID(node_id), (int) XT_GET_DISK_2(block_data));
5781
#ifdef IND_FILL_BLOCK_TO_NEXT
5782
				if (block_len < XT_INDEX_PAGE_SIZE) {
5783
					XTIndPageDataDPtr	next_page_data;
5784
5785
					next_page_data = (XTIndPageDataDPtr) (buffer + req_size);
5786
					if (next_page_data->ild_data_type == XT_DT_INDEX_PAGE) {
5787
						xtIndexNodeID next_node_id;
5788
5789
						next_node_id = XT_RET_NODE_ID(XT_GET_DISK_4(next_page_data->ild_page_id_4));
5790
						/* Write the whole page, if that means leaving no gaps! */
5791
						if (next_node_id == node_id+1)
5792
							block_len = XT_INDEX_PAGE_SIZE;
5793
					}
5794
				}
5795
#endif
5796
				ASSERT_NS(block_len >= 2 && block_len <= XT_INDEX_PAGE_SIZE);
5797
				if (!il_pwrite_file(ot, address, block_len, block_data))
5798
					return FAILED;
5799
5800
				offset += req_size;
5801
				req_size = 0;
5802
				break;
5803
			case XT_DT_SET_PAGE_HEAD: {
5804
				XTIndSetPageHeadDataDPtr	sph_data;
5805
	
5806
				req_size = sizeof(XTIndSetPageHeadDataDRec);
5807
				if (il_buffer_len - pos < req_size) {
5808
					il_buffer_len = 0;
5809
					continue;
5810
				}
5811
5812
				sph_data = (XTIndSetPageHeadDataDPtr) buffer;
5813
				node_id = XT_RET_NODE_ID(XT_GET_DISK_4(sph_data->ild_page_id_4));
5814
				block_offset = 0;
5815
				block_len = 0;
5816
				block_header = sph_data->ild_page_head_2;
5817
				block_data = NULL;
5818
				goto do_mod_page;
5819
			}
5820
			case XT_DT_MOD_IND_PAGE_HEAD:
5821
			case XT_DT_MOD_IND_PAGE_HEAD_EOB: {
5822
				XTIndModPageHeadDataDPtr	mph_data;
5823
	
5824
				req_size = offsetof(XTIndModPageHeadDataDRec, ild_data);
5825
				if (il_buffer_len - pos < req_size) {
5826
					il_buffer_len = 0;
5827
					continue;
5828
				}
5829
5830
				mph_data = (XTIndModPageHeadDataDPtr) buffer;
5831
				node_id = XT_RET_NODE_ID(XT_GET_DISK_4(mph_data->ild_page_id_4));
5832
				block_offset = XT_GET_DISK_2(mph_data->ild_offset_2);
5833
				block_len = XT_GET_DISK_2(mph_data->ild_size_2);
5834
				block_header = mph_data->ild_page_head_2;
5835
				block_data = mph_data->ild_data;
5836
				goto do_mod_page;
5837
			}
5838
			case XT_DT_MOD_IND_PAGE:
5839
			case XT_DT_MOD_IND_PAGE_EOB:
5840
				XTIndModPageDataDPtr		mp_data;
5841
5842
				req_size = offsetof(XTIndModPageDataDRec, ild_data);
5843
				if (il_buffer_len - pos < req_size) {
5844
					il_buffer_len = 0;
5845
					continue;
5846
				}
5847
5848
				mp_data = (XTIndModPageDataDPtr) buffer;
5849
				node_id = XT_RET_NODE_ID(XT_GET_DISK_4(mp_data->ild_page_id_4));
5850
				block_offset = XT_GET_DISK_2(mp_data->ild_offset_2);
5851
				block_len = XT_GET_DISK_2(mp_data->ild_size_2);
5852
				block_header = NULL;
5853
				block_data = mp_data->ild_data;
5854
5855
				do_mod_page:
5856
				if (block_offset + block_len > XT_INDEX_PAGE_DATA_SIZE) {
5857
					xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, tab->tab_name);
5858
					return FAILED;
5859
				}
5860
5861
				req_size += block_len;
5862
				if (il_buffer_len - pos < req_size) {
5863
					il_buffer_len = 0;
5864
					continue;
5865
				}
5866
5867
				TRACK_BLOCK_FLUSH_N(node_id);
5868
				address = xt_ind_node_to_offset(tab, node_id);
5869
				/* {WRITE-IN-BLOCKS} Round up the block size. Space has been provided. */
5870
				IDX_TRACE("%d- W%x\n", (int) XT_NODE_ID(node_id), (int) XT_GET_DISK_2(block_data));
5871
				if (block_header) {
5872
					if (!il_pwrite_file(ot, address, 2, block_header))
5873
						return FAILED;
5874
				}
5875
5876
				if (block_data) {
5877
#ifdef IND_WRITE_IN_BLOCK_SIZES
5878
					if (*buffer == XT_DT_MOD_IND_PAGE_HEAD_EOB || *buffer == XT_DT_MOD_IND_PAGE_EOB)
5879
						block_len = ((block_len + IND_WRITE_BLOCK_SIZE - 1) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5880
#endif
5881
					if (!il_pwrite_file(ot, address + XT_INDEX_PAGE_HEAD_SIZE + block_offset, block_len, block_data))
5882
						return FAILED;
5883
				}
5884
5885
				offset += req_size;
5886
				req_size = 0;
5887
				break;
5888
			case XT_DT_FREE_LIST:
5889
				xtWord4	block, nblock;
5890
				union {
5891
					xtWord1				buffer[IND_WRITE_BLOCK_SIZE];
5892
					XTIndFreeBlockRec	free_block;
5893
				} x;
5894
				off_t	aoff;
5895
5896
				memset(x.buffer, 0, sizeof(XTIndFreeBlockRec));
5897
5898
				pos++;
5899
				offset++;
5900
				
5901
				for (;;) {
5902
					req_size = 8;
5903
					if (il_buffer_len - pos < req_size) {
5904
						il_buffer_len = il_buffer_size;
5905
						if (il_log_eof - offset < (off_t) il_buffer_len)
5906
							il_buffer_len = (size_t) (il_log_eof - offset);
5907
						/* Corrupt log?! */
5908
						if (il_buffer_len < req_size) {
5909
							xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_INDEX_LOG_CORRUPT, xt_file_path(il_of));
5910
							xt_log_and_clear_exception_ns();
5911
							return OK;
5912
						}
5913
						if (!xt_pread_file(il_of, offset, il_buffer_len, il_buffer_len, il_buffer, NULL, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
5914
							return FAILED;
5915
						pos = 0;
5916
					}
5917
					block = XT_GET_DISK_4(il_buffer + pos);
5918
					nblock = XT_GET_DISK_4(il_buffer + pos + 4);
5919
					if (nblock == 0xFFFFFFFF)
5920
						break;
5921
					aoff = xt_ind_node_to_offset(tab, XT_RET_NODE_ID(block));
5922
					XT_SET_DISK_8(x.free_block.if_next_block_8, nblock);
5923
					IDX_TRACE("%d- *%x\n", (int) block, (int) XT_GET_DISK_2(x.buffer));
5924
					if (!il_pwrite_file(ot, aoff, IND_WRITE_BLOCK_SIZE, x.buffer))
5925
						return FAILED;
5926
					pos += 4;
5927
					offset += 4;
5928
				}
5929
5930
				offset += 8;
5931
				req_size = 0;
5932
				break;
5933
			case XT_DT_HEADER:
5934
				XTIndHeadDataDPtr	head_data;
5935
				size_t				len;
5936
5937
				req_size = offsetof(XTIndHeadDataDRec, ilh_data);
5938
				if (il_buffer_len - pos < req_size) {
5939
					il_buffer_len = 0;
5940
					continue;
5941
				}
5942
				head_data = (XTIndHeadDataDPtr) buffer;
5943
				len = XT_GET_DISK_2(head_data->ilh_head_size_2);
5944
5945
				req_size = offsetof(XTIndHeadDataDRec, ilh_data) + len;
5946
				if (il_buffer_len - pos < req_size) {
5947
					il_buffer_len = 0;
5948
					continue;
5949
				}
5950
5951
				if (!il_pwrite_file(ot, 0, len, head_data->ilh_data))
5952
					return FAILED;
5953
5954
				offset += req_size;
5955
				req_size = 0;
5956
				break;
5957
			case XT_DT_2_MOD_IND_PAGE:
5958
			case XT_DT_2_MOD_IND_PAGE_EOB:
5959
				XTIndDoubleModPageDataDPtr	dd_data;
5960
				u_int						block_len2;
5961
5962
				req_size = offsetof(XTIndDoubleModPageDataDRec, dld_data);
5963
				if (il_buffer_len - pos < req_size) {
5964
					il_buffer_len = 0;
5965
					continue;
5966
				}
5967
5968
				dd_data = (XTIndDoubleModPageDataDPtr) buffer;
5969
				node_id = XT_RET_NODE_ID(XT_GET_DISK_4(dd_data->dld_page_id_4));
5970
				block_len = XT_GET_DISK_2(dd_data->dld_size1_2);
5971
				block_offset = XT_GET_DISK_2(dd_data->dld_offset2_2);
5972
				block_len2 = XT_GET_DISK_2(dd_data->dld_size2_2);
5973
				block_data = dd_data->dld_data;
5974
5975
				req_size += block_len + block_len2;
5976
				if (il_buffer_len - pos < req_size)
5977
				{
5978
					il_buffer_len = 0;
5979
					continue;
5980
				}
5981
5982
				TRACK_BLOCK_FLUSH_N(node_id);
5983
				address = xt_ind_node_to_offset(tab, node_id);
5984
				IDX_TRACE("%d- W%x\n", (int) XT_NODE_ID(node_id), (int) XT_GET_DISK_2(block_data));
5985
				if (!il_pwrite_file(ot, address, block_len, block_data))
5986
					return FAILED;
5987
5988
#ifdef IND_WRITE_IN_BLOCK_SIZES
5989
				if (*buffer == XT_DT_2_MOD_IND_PAGE_EOB)
5990
					block_len2 = ((block_len2 + IND_WRITE_BLOCK_SIZE - 1) / IND_WRITE_BLOCK_SIZE) * IND_WRITE_BLOCK_SIZE;
5991
#endif
5992
				if (!il_pwrite_file(ot, address + XT_INDEX_PAGE_HEAD_SIZE + block_offset, block_len2, block_data + block_len))
5993
					return FAILED;
5994
5995
				offset += req_size;
5996
				req_size = 0;
5997
				break;
5998
			default:
5999
				xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_INDEX_LOG_CORRUPT, xt_file_path(il_of));
6000
				xt_log_and_clear_exception_ns();
6001
				return OK;
6002
		}
6003
#ifdef CHECK_IF_WRITE_WAS_OK
6004
		if (node_id) {
6005
			if (!xt_ind_get(ot, node_id, &c_iref))
6006
				ASSERT_NS(FALSE);
6007
			if (c_iref.ir_block) {
6008
				c_node = (XTIdxBranchDPtr) c_iref.ir_block->cb_data;
6009
				c_block_len = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(c_node->tb_size_2));
6010
6011
				if (!xt_pread_file(ot->ot_ind_file, address, XT_INDEX_PAGE_SIZE, 0, &ot->ot_ind_tmp_buf, NULL, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
6012
					ASSERT_NS(FALSE);
6013
				if (c_iref.ir_block->cb_min_pos == 0xFFFF)
6014
					check_buff(&ot->ot_ind_tmp_buf, c_node, c_block_len);
6015
				else {
6016
					if (!c_iref.ir_block->cb_header)
6017
						check_buff(&ot->ot_ind_tmp_buf, c_node, 2);
6018
					check_buff(ot->ot_ind_tmp_buf.tb_data, c_node->tb_data, c_iref.ir_block->cb_min_pos);
6019
					check_buff(ot->ot_ind_tmp_buf.tb_data + c_iref.ir_block->cb_max_pos,
6020
						c_node->tb_data + c_iref.ir_block->cb_max_pos,
6021
						c_block_len - XT_INDEX_PAGE_HEAD_SIZE - c_iref.ir_block->cb_max_pos);
6022
				}
6023
				xt_ind_release(ot, NULL, XT_UNLOCK_WRITE, &c_iref);
6024
			}
6025
		}
6026
#endif
6027
		if (il_bytes_written >= IND_FLUSH_THRESHOLD) {
6028
			if (!il_flush_file(ot))
6029
				return FAILED;
6030
		}
6031
	}
6032
	return OK;
6033
}
6034
6035
xtBool XTIndexLog::il_apply_log_flush(struct XTOpenTable *ot)
6036
{
6037
	register XTTableHPtr	tab = ot->ot_table;
6038
	XTIndLogHeadDRec		log_head;
6039
6040
#ifdef PRINT_IND_FLUSH_STATS
6041
	xtWord8					b_flush_time = ot->ot_thread->st_statistics.st_ind.ts_flush_time;
6042
#endif
6043
	if (!il_flush_file(ot))
6044
		return FAILED;
6045
#ifdef PRINT_IND_FLUSH_STATS
6046
	char	buf1[30];
6047
	char	buf2[30];
6048
	char	buf3[30];
6049
6050
	double time;
6051
	double kb;
6052
6053
	ot->ot_table->tab_ind_flush_time += ot->ot_thread->st_statistics.st_ind.ts_flush_time - b_flush_time;
6054
	ot->ot_table->tab_ind_flush++;
6055
6056
	time = (double) ot->ot_table->tab_ind_flush_time / (double) 1000000 / (double) ot->ot_table->tab_ind_flush;
6057
	kb = (double) ot->ot_table->tab_ind_write / (double) ot->ot_table->tab_ind_flush / (double) 1024;
6058
	printf("TIME: %s      Kbytes: %s      Mps: %s      Flush Count: %d\n", 
6059
		idx_format(buf1, time),
6060
		idx_format(buf2, kb),
6061
		idx_format_mb(buf3, kb / time),
6062
		(int) ot->ot_table->tab_ind_flush
6063
		);
6064
#endif
6065
6066
	log_head.ilh_data_type = XT_DT_LOG_HEAD;
6067
	XT_SET_DISK_4(log_head.ilh_tab_id_4, il_tab_id);
6068
	XT_SET_DISK_4(log_head.ilh_log_eof_4, 0);
6069
6070
	if (!xt_pwrite_file(il_of, 0, sizeof(XTIndLogHeadDRec), (xtWord1 *) &log_head, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
6071
		return FAILED;
6072
6073
	if (!XT_IS_TEMP_TABLE(tab->tab_dic.dic_tab_flags)) {
6074
		if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
6075
			return FAILED;
6076
	}
6077
	return OK;
6078
}
6079
6080
inline xtBool XTIndexLog::il_pwrite_file(struct XTOpenTable *ot, off_t offs, size_t siz, void *dat)
6081
{
6082
#ifdef IND_WRITE_IN_BLOCK_SIZES
6083
	ASSERT_NS(((offs) % IND_WRITE_BLOCK_SIZE) == 0);
6084
	ASSERT_NS(((siz) % IND_WRITE_BLOCK_SIZE) == 0);
6085
#endif
6086
	il_bytes_written += siz;
6087
#ifdef PRINT_IND_FLUSH_STATS
6088
	xtBool ok;
6089
6090
	u_int	b_write = ot->ot_thread->st_statistics.st_ind.ts_write;
6091
	ok = xt_pwrite_file(ot->ot_ind_file, offs, siz, dat, &ot->ot_thread->st_statistics.st_ind, ot->ot_thread);
6092
	ot->ot_table->tab_ind_write += ot->ot_thread->st_statistics.st_ind.ts_write - b_write;
6093
	return ok;
6094
#else
6095
	return xt_pwrite_file(ot->ot_ind_file, offs, siz, dat, &ot->ot_thread->st_statistics.st_ind, ot->ot_thread);
6096
#endif
6097
}
6098
6099
inline xtBool XTIndexLog::il_flush_file(struct XTOpenTable *ot)
6100
{
6101
	xtBool ok = TRUE;
6102
6103
	il_bytes_written = 0;
6104
	if (!XT_IS_TEMP_TABLE(ot->ot_table->tab_dic.dic_tab_flags)) {
6105
		ok = xt_flush_file(ot->ot_ind_file, &ot->ot_thread->st_statistics.st_ind, ot->ot_thread);
6106
	}
6107
	return ok;
6108
}
6109
6110
xtBool XTIndexLog::il_open_table(struct XTOpenTable **ot)
6111
{
6112
	return xt_db_open_pool_table_ns(ot, il_pool->ilp_db, il_tab_id);
6113
}
6114
6115
void XTIndexLog::il_close_table(struct XTOpenTable *ot)
6116
{
6117
	xt_db_return_table_to_pool_ns(ot);
6118
}
6119
6120