~drizzle-trunk/drizzle/development

1455.3.1 by Vladimir Kolesnikov
lp:drizzle + pbxt 1.1 + test results
1
/* Copyright (c) 2005 PrimeBase Technologies GmbH, Germany
2
 *
3
 * PrimeBase XT
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License
16
 * along with this program; if not, write to the Free Software
1802.10.2 by Monty Taylor
Update all of the copyright headers to include the correct address.
17
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
1455.3.1 by Vladimir Kolesnikov
lp:drizzle + pbxt 1.1 + test results
18
 *
19
 * 2005-05-24	Paul McCullagh
20
 *
21
 * H&G2JCtL
22
 */
23
24
#include "xt_config.h"
25
26
#ifdef DRIZZLED
27
#include <bitset>
28
#endif
29
30
#ifndef XT_WIN
31
#include <unistd.h>
32
#endif
33
34
#include <stdio.h>
35
#include <time.h>
36
37
#include "pthread_xt.h"
38
#include "thread_xt.h"
39
#include "filesys_xt.h"
40
#include "cache_xt.h"
41
#include "table_xt.h"
42
#include "trace_xt.h"
43
#include "util_xt.h"
44
45
#define XT_TIME_DIFF(start, now) (\
46
	((xtWord4) (now) < (xtWord4) (start)) ? \
47
	((xtWord4) 0XFFFFFFFF - ((xtWord4) (start) - (xtWord4) (now))) : \
48
	((xtWord4) (now) - (xtWord4) (start)))
49
50
/*
51
 * -----------------------------------------------------------------------
52
 * D I S K   C A C H E
53
 */
54
55
#define IDX_CAC_SEGMENT_COUNT		((off_t) 1 << XT_INDEX_CACHE_SEGMENT_SHIFTS)
56
#define IDX_CAC_SEGMENT_MASK		(IDX_CAC_SEGMENT_COUNT - 1)
57
58
#ifdef XT_NO_ATOMICS
59
#define IDX_CAC_USE_PTHREAD_RW
60
#else
61
//#define IDX_CAC_USE_PTHREAD_RW
62
#define IDX_CAC_USE_XSMUTEX
63
//#define IDX_USE_SPINXSLOCK
64
#endif
65
66
#if defined(IDX_CAC_USE_PTHREAD_RW)
67
#define IDX_CAC_LOCK_TYPE				xt_rwlock_type
68
#define IDX_CAC_INIT_LOCK(s, i)			xt_init_rwlock_with_autoname(s, &(i)->cs_lock)
69
#define IDX_CAC_FREE_LOCK(s, i)			xt_free_rwlock(&(i)->cs_lock)	
70
#define IDX_CAC_READ_LOCK(i, o)			xt_slock_rwlock_ns(&(i)->cs_lock)
71
#define IDX_CAC_WRITE_LOCK(i, o)		xt_xlock_rwlock_ns(&(i)->cs_lock)
72
#define IDX_CAC_UNLOCK(i, o)			xt_unlock_rwlock_ns(&(i)->cs_lock)
73
#elif defined(IDX_CAC_USE_XSMUTEX)
74
#define IDX_CAC_LOCK_TYPE				XTMutexXSLockRec
75
#define IDX_CAC_INIT_LOCK(s, i)			xt_xsmutex_init_with_autoname(s, &(i)->cs_lock)
76
#define IDX_CAC_FREE_LOCK(s, i)			xt_xsmutex_free(s, &(i)->cs_lock)	
77
#define IDX_CAC_READ_LOCK(i, o)			xt_xsmutex_slock(&(i)->cs_lock, (o)->t_id)
78
#define IDX_CAC_WRITE_LOCK(i, o)		xt_xsmutex_xlock(&(i)->cs_lock, (o)->t_id)
79
#define IDX_CAC_UNLOCK(i, o)			xt_xsmutex_unlock(&(i)->cs_lock, (o)->t_id)
80
#elif defined(IDX_CAC_USE_SPINXSLOCK)
81
#define IDX_CAC_LOCK_TYPE				XTSpinXSLockRec
82
#define IDX_CAC_INIT_LOCK(s, i)			xt_spinxslock_init_with_autoname(s, &(i)->cs_lock)
83
#define IDX_CAC_FREE_LOCK(s, i)			xt_spinxslock_free(s, &(i)->cs_lock)	
84
#define IDX_CAC_READ_LOCK(i, s)			xt_spinxslock_slock(&(i)->cs_lock, (s)->t_id)
85
#define IDX_CAC_WRITE_LOCK(i, s)		xt_spinxslock_xlock(&(i)->cs_lock, FALSE, (s)->t_id)
86
#define IDX_CAC_UNLOCK(i, s)			xt_spinxslock_unlock(&(i)->cs_lock, (s)->t_id)
87
#else
88
#error Please define the lock type
89
#endif
90
91
#ifdef XT_NO_ATOMICS
92
#define ID_HANDLE_USE_PTHREAD_RW
93
#else
94
//#define ID_HANDLE_USE_PTHREAD_RW
95
#define ID_HANDLE_USE_SPINLOCK
96
#endif
97
98
#if defined(ID_HANDLE_USE_PTHREAD_RW)
99
#define ID_HANDLE_LOCK_TYPE				xt_mutex_type
100
#define ID_HANDLE_INIT_LOCK(s, i)		xt_init_mutex_with_autoname(s, i)
101
#define ID_HANDLE_FREE_LOCK(s, i)		xt_free_mutex(i)	
102
#define ID_HANDLE_LOCK(i)				xt_lock_mutex_ns(i)
103
#define ID_HANDLE_UNLOCK(i)				xt_unlock_mutex_ns(i)
104
#elif defined(ID_HANDLE_USE_SPINLOCK)
105
#define ID_HANDLE_LOCK_TYPE				XTSpinLockRec
106
#define ID_HANDLE_INIT_LOCK(s, i)		xt_spinlock_init_with_autoname(s, i)
107
#define ID_HANDLE_FREE_LOCK(s, i)		xt_spinlock_free(s, i)	
108
#define ID_HANDLE_LOCK(i)				xt_spinlock_lock(i)
109
#define ID_HANDLE_UNLOCK(i)				xt_spinlock_unlock(i)
110
#endif
111
112
#define XT_HANDLE_SLOTS					37
113
114
/*
115
#ifdef DEBUG
116
#define XT_INIT_HANDLE_COUNT			0
117
#define XT_INIT_HANDLE_BLOCKS			0
118
#else
119
#define XT_INIT_HANDLE_COUNT			40
120
#define XT_INIT_HANDLE_BLOCKS			10
121
#endif
122
*/
123
124
/* A disk cache segment. The cache is divided into a number of segments
125
 * to improve concurrency.
126
 */
127
typedef struct DcSegment {
128
	IDX_CAC_LOCK_TYPE	cs_lock;						/* The cache segment lock. */
129
	XTIndBlockPtr		*cs_hash_table;
130
} DcSegmentRec, *DcSegmentPtr;
131
132
typedef struct DcHandleSlot {
133
	ID_HANDLE_LOCK_TYPE	hs_handles_lock;
134
	XTIndHandleBlockPtr	hs_free_blocks;
135
	XTIndHandlePtr		hs_free_handles;
136
	XTIndHandlePtr		hs_used_handles;
137
} DcHandleSlotRec, *DcHandleSlotPtr;
138
139
typedef struct DcGlobals {
140
	xt_mutex_type		cg_lock;						/* The public cache lock. */
141
	DcSegmentRec		cg_segment[IDX_CAC_SEGMENT_COUNT];
142
	XTIndBlockPtr		cg_blocks;
143
#ifdef XT_USE_DIRECT_IO_ON_INDEX
144
	xtWord1				*cg_buffer;
145
#endif
146
	XTIndBlockPtr		cg_free_list;
147
	xtWord4				cg_free_count;
148
	xtWord4				cg_ru_now;						/* A counter as described by Jim Starkey (my thanks) */
149
	XTIndBlockPtr		cg_lru_block;
150
	XTIndBlockPtr		cg_mru_block;
151
	xtWord4				cg_hash_size;
152
	xtWord4				cg_block_count;
153
	xtWord4				cg_max_free;
154
#ifdef DEBUG_CHECK_IND_CACHE
155
	u_int				cg_reserved_by_ots;				/* Number of blocks reserved by open tables. */
156
	u_int				cg_read_count;					/* Number of blocks being read. */
157
#endif
158
159
	/* Index cache handles: */
160
	DcHandleSlotRec		cg_handle_slot[XT_HANDLE_SLOTS];
161
} DcGlobalsRec;
162
163
static DcGlobalsRec	ind_cac_globals;
164
165
#ifdef XT_USE_MYSYS
166
#ifdef xtPublic
167
#undef xtPublic
168
#endif
169
#include "my_global.h"
170
#include "my_sys.h"
171
#include "keycache.h"
172
KEY_CACHE my_cache;
173
#undef	pthread_rwlock_rdlock
174
#undef	pthread_rwlock_wrlock
175
#undef	pthread_rwlock_try_wrlock
176
#undef	pthread_rwlock_unlock
177
#undef	pthread_mutex_lock
178
#undef	pthread_mutex_unlock
179
#undef	pthread_cond_wait
180
#undef	pthread_cond_broadcast
181
#undef	xt_mutex_type
182
#define xtPublic
183
#endif
184
185
/*
186
 * -----------------------------------------------------------------------
187
 * INDEX CACHE HANDLES
188
 */
189
190
static XTIndHandlePtr ind_alloc_handle()
191
{
192
	XTIndHandlePtr handle;
193
194
	if (!(handle = (XTIndHandlePtr) xt_calloc_ns(sizeof(XTIndHandleRec))))
195
		return NULL;
196
	xt_spinlock_init_with_autoname(NULL, &handle->ih_lock);
197
	return handle;
198
}
199
200
static void ind_free_handle(XTIndHandlePtr handle)
201
{
202
	xt_spinlock_free(NULL, &handle->ih_lock);
203
	xt_free_ns(handle);
204
}
205
206
static void ind_handle_exit(XTThreadPtr self)
207
{
208
	DcHandleSlotPtr		hs;
209
	XTIndHandlePtr		handle;
210
	XTIndHandleBlockPtr	hptr;
211
212
	for (int i=0; i<XT_HANDLE_SLOTS; i++) {
213
		hs = &ind_cac_globals.cg_handle_slot[i];
214
215
		while (hs->hs_used_handles) {
216
			handle = hs->hs_used_handles;
217
			xt_ind_release_handle(handle, FALSE, self);
218
		}
219
220
		while (hs->hs_free_blocks) {
221
			hptr = hs->hs_free_blocks;
222
			hs->hs_free_blocks = hptr->hb_next;
223
			xt_free(self, hptr);
224
		}
225
226
		while (hs->hs_free_handles) {
227
			handle = hs->hs_free_handles;
228
			hs->hs_free_handles = handle->ih_next;
229
			ind_free_handle(handle);
230
		}
231
232
		ID_HANDLE_FREE_LOCK(self, &hs->hs_handles_lock);
233
	}
234
}
235
236
static void ind_handle_init(XTThreadPtr self)
237
{
238
	DcHandleSlotPtr		hs;
239
240
	for (int i=0; i<XT_HANDLE_SLOTS; i++) {
241
		hs = &ind_cac_globals.cg_handle_slot[i];
242
		memset(hs, 0, sizeof(DcHandleSlotRec));
243
		ID_HANDLE_INIT_LOCK(self, &hs->hs_handles_lock);
244
	}
245
}
246
247
//#define CHECK_HANDLE_STRUCTS
248
249
#ifdef CHECK_HANDLE_STRUCTS
250
static int gdummy = 0;
251
252
static void ic_stop_here()
253
{
254
	gdummy = gdummy + 1;
255
	printf("Nooo %d!\n", gdummy);
256
}
257
258
static void ic_check_handle_structs()
259
{
260
	XTIndHandlePtr		handle, phandle;
261
	XTIndHandleBlockPtr	hptr, phptr;
262
	int					count = 0;
263
	int					ctest;
264
265
	phandle = NULL;
266
	handle = ind_cac_globals.cg_used_handles;
267
	while (handle) {
268
		if (handle == phandle)
269
			ic_stop_here();
270
		if (handle->ih_prev != phandle)
271
			ic_stop_here();
272
		if (handle->ih_cache_reference) {
273
			ctest = handle->x.ih_cache_block->cb_handle_count;
274
			if (ctest == 0 || ctest > 100)
275
				ic_stop_here();
276
		}
277
		else {
278
			ctest = handle->x.ih_handle_block->hb_ref_count;
279
			if (ctest == 0 || ctest > 100)
280
				ic_stop_here();
281
		}
282
		phandle = handle;
283
		handle = handle->ih_next;
284
		count++;
285
		if (count > 1000)
286
			ic_stop_here();
287
	}
288
289
	count = 0;
290
	hptr = ind_cac_globals.cg_free_blocks;
291
	while (hptr) {
292
		if (hptr == phptr)
293
			ic_stop_here();
294
		phptr = hptr;
295
		hptr = hptr->hb_next;
296
		count++;
297
		if (count > 1000)
298
			ic_stop_here();
299
	}
300
301
	count = 0;
302
	handle = ind_cac_globals.cg_free_handles;
303
	while (handle) {
304
		if (handle == phandle)
305
			ic_stop_here();
306
		phandle = handle;
307
		handle = handle->ih_next;
308
		count++;
309
		if (count > 1000)
310
			ic_stop_here();
311
	}
312
}
313
#endif
314
315
/*
316
 * Get a handle to the index block.
317
 * This function is called by index scanners (readers).
318
 */
319
xtPublic XTIndHandlePtr xt_ind_get_handle(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref)
320
{
321
	DcHandleSlotPtr	hs;
322
	XTIndHandlePtr	handle;
323
324
	hs = &ind_cac_globals.cg_handle_slot[iref->ir_block->cb_address % XT_HANDLE_SLOTS];
325
326
	ASSERT_NS(iref->ir_xlock == FALSE);
327
	ASSERT_NS(iref->ir_updated == FALSE);
328
	ID_HANDLE_LOCK(&hs->hs_handles_lock);
329
#ifdef CHECK_HANDLE_STRUCTS
330
	ic_check_handle_structs();
331
#endif
332
	if ((handle = hs->hs_free_handles))
333
		hs->hs_free_handles = handle->ih_next;
334
	else {
335
		if (!(handle = ind_alloc_handle())) {
336
			ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
337
			xt_ind_release(ot, ind, XT_UNLOCK_READ, iref);
338
			return NULL;
339
		}
340
	}
341
	if (hs->hs_used_handles)
342
		hs->hs_used_handles->ih_prev = handle;
343
	handle->ih_next = hs->hs_used_handles;
344
	handle->ih_prev = NULL;
345
	handle->ih_address = iref->ir_block->cb_address;
346
	handle->ih_cache_reference = TRUE;
347
	handle->x.ih_cache_block = iref->ir_block;
348
	handle->ih_branch = iref->ir_branch;
349
	/* {HANDLE-COUNT-USAGE}
350
	 * This is safe because:
351
	 *
352
	 * I have an Slock on the cache block, and I have
353
	 * at least an Slock on the index.
354
	 * So this excludes anyone who is reading 
355
	 * cb_handle_count in the index.
356
	 * (all cache block writers, and the freeer).
357
	 *
358
	 * The increment is safe because I have the list
359
	 * lock (hs_handles_lock), which is required by anyone else
360
	 * who increments or decrements this value.
361
	 */
362
	iref->ir_block->cb_handle_count++;
363
	hs->hs_used_handles = handle;
364
#ifdef CHECK_HANDLE_STRUCTS
365
	ic_check_handle_structs();
366
#endif
367
	ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
368
	xt_ind_release(ot, ind, XT_UNLOCK_READ, iref);
369
	return handle;
370
}
371
372
xtPublic void xt_ind_release_handle(XTIndHandlePtr handle, xtBool have_lock, XTThreadPtr thread)
373
{
374
	DcHandleSlotPtr	hs;
375
	XTIndBlockPtr	block = NULL;
376
	u_int			hash_idx = 0;
377
	DcSegmentPtr	seg = NULL;
378
	XTIndBlockPtr	xblock;
379
1510.1.2 by Paul McCullagh
Improved changes back from 1.1 trunk
380
	(void) thread; /*DRIZZLED*/
1500 by Brian Aker
Fix for PPC
381
1455.3.1 by Vladimir Kolesnikov
lp:drizzle + pbxt 1.1 + test results
382
	/* The lock order is:
383
	 * 1. Cache segment (cs_lock) - This is only by ind_free_block()!
384
	 * 1. S/Slock cache block (cb_lock)
385
	 * 2. List lock (cg_handles_lock).
386
	 * 3. Handle lock (ih_lock)
387
	 */
388
	if (!have_lock)
389
		xt_spinlock_lock(&handle->ih_lock);
390
391
	/* Get the lock on the cache page if required: */
392
	if (handle->ih_cache_reference) {
393
		u_int			file_id;
394
		xtIndexNodeID	address;
395
396
		block = handle->x.ih_cache_block;
397
398
		file_id = block->cb_file_id;
399
		address = block->cb_address;
400
		hash_idx = XT_NODE_ID(address) + (file_id * 223);
401
		seg = &ind_cac_globals.cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
402
		hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % ind_cac_globals.cg_hash_size;
403
	}
404
405
	xt_spinlock_unlock(&handle->ih_lock);
406
407
	/* Because of the lock order, I have to release the
408
	 * handle before I get a lock on the cache block.
409
	 *
410
	 * But, by doing this, this cache block may be gone!
411
	 */
412
	if (block) {
413
		IDX_CAC_READ_LOCK(seg, thread);
414
		xblock = seg->cs_hash_table[hash_idx];
415
		while (xblock) {
416
			if (block == xblock) {
417
				/* Found the block... 
418
				 * {HANDLE-COUNT-SLOCK}
419
				 * 04.05.2009, changed to slock.
420
				 * The xlock causes too much contention
421
				 * on the cache block for read only loads.
422
				 *
423
				 * Is it safe?
424
				 * See below...
425
				 */
426
				XT_IPAGE_READ_LOCK(&block->cb_lock);
427
				goto block_found;
428
			}
429
			xblock = xblock->cb_next;
430
		}
431
		block = NULL;
432
		block_found:
433
		IDX_CAC_UNLOCK(seg, thread);
434
	}
435
436
	hs = &ind_cac_globals.cg_handle_slot[handle->ih_address % XT_HANDLE_SLOTS];
437
438
	ID_HANDLE_LOCK(&hs->hs_handles_lock);
439
#ifdef CHECK_HANDLE_STRUCTS
440
	ic_check_handle_structs();
441
#endif
442
443
	/* I don't need to lock the handle because I have locked
444
	 * the list, and no other thread can change the
445
	 * handle without first getting a lock on the list.
446
	 *
447
	 * In addition, the caller is the only owner of the
448
	 * handle, and the only thread with an independent
449
	 * reference to the handle.
450
	 * All other access occur over the list.
451
	 */
452
453
	/* Remove the reference to the cache or a handle block: */
454
	if (handle->ih_cache_reference) {
455
		ASSERT_NS(block == handle->x.ih_cache_block);
456
		ASSERT_NS(block && block->cb_handle_count > 0);
457
		/* {HANDLE-COUNT-USAGE}
458
		 * This is safe here because I have excluded
459
		 * all readers by taking an Xlock on the
460
		 * cache block (CHANGED - see below).
461
		 *
462
		 * {HANDLE-COUNT-SLOCK}
463
		 * 04.05.2009, changed to slock.
464
		 * Should be OK, because:
465
		 * A have a lock on the list lock (hs_handles_lock),
466
		 * which prevents concurrent updates to cb_handle_count.
467
		 *
468
		 * I have also have a read lock on the cache block
469
		 * but not a lock on the index. As a result, we cannot
470
		 * excluded all index writers (and readers of 
471
		 * cb_handle_count.
472
		 */
473
		block->cb_handle_count--;
474
	}
475
	else {
476
		XTIndHandleBlockPtr	hptr = handle->x.ih_handle_block;
477
478
		ASSERT_NS(!handle->ih_cache_reference);
479
		ASSERT_NS(hptr->hb_ref_count > 0);
480
		hptr->hb_ref_count--;
481
		if (!hptr->hb_ref_count) {
482
			/* Put it back on the free list: */
483
			hptr->hb_next = hs->hs_free_blocks;
484
			hs->hs_free_blocks = hptr;
485
		}
486
	}
487
488
	/* Unlink the handle: */
489
	if (handle->ih_next)
490
		handle->ih_next->ih_prev = handle->ih_prev;
491
	if (handle->ih_prev)
492
		handle->ih_prev->ih_next = handle->ih_next;
493
	if (hs->hs_used_handles == handle)
494
		hs->hs_used_handles = handle->ih_next;
495
496
	/* Put it on the free list: */
497
	handle->ih_next = hs->hs_free_handles;
498
	hs->hs_free_handles = handle;
499
500
#ifdef CHECK_HANDLE_STRUCTS
501
	ic_check_handle_structs();
502
#endif
503
	ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
504
505
	if (block)
506
		XT_IPAGE_UNLOCK(&block->cb_lock, FALSE);
507
}
508
509
/* Call this function before a referenced cache block is modified!
510
 * This function is called by index updaters.
511
 */
512
xtPublic xtBool xt_ind_copy_on_write(XTIndReferencePtr iref)
513
{
514
	DcHandleSlotPtr		hs;
515
	XTIndHandleBlockPtr	hptr;
516
	u_int				branch_size;
517
	XTIndHandlePtr		handle;
518
	u_int				i = 0;
519
520
	hs = &ind_cac_globals.cg_handle_slot[iref->ir_block->cb_address % XT_HANDLE_SLOTS];
521
522
	ID_HANDLE_LOCK(&hs->hs_handles_lock);
523
524
	/* {HANDLE-COUNT-USAGE}
525
	 * This is only called by updaters of this index block, or
526
	 * the free which holds an Xlock on the index block.
527
	 * These are all mutually exclusive for the index block.
528
	 *
529
	 * {HANDLE-COUNT-SLOCK}
530
	 * Do this check again, after we have the list lock (hs_handles_lock).
531
	 * There is a small chance that the count has changed, since we last
532
	 * checked because xt_ind_release_handle() only holds
533
	 * an slock on the index page.
534
	 *
535
	 * An updater can sometimes have a XLOCK on the index and an slock
536
	 * on the cache block. In this case xt_ind_release_handle()
537
	 * could have run through.
538
	 */
539
	if (!iref->ir_block->cb_handle_count) {
540
		ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
541
		return OK;
542
	}
543
544
#ifdef CHECK_HANDLE_STRUCTS
545
	ic_check_handle_structs();
546
#endif
547
	if ((hptr = hs->hs_free_blocks))
548
		hs->hs_free_blocks = hptr->hb_next;
549
	else {
550
		if (!(hptr = (XTIndHandleBlockPtr) xt_malloc_ns(sizeof(XTIndHandleBlockRec)))) {
551
			ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
552
			return FAILED;
553
		}
554
	}
555
556
	branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref->ir_branch->tb_size_2));
557
	memcpy(&hptr->hb_branch, iref->ir_branch, branch_size);
558
	hptr->hb_ref_count = iref->ir_block->cb_handle_count;
559
560
	handle = hs->hs_used_handles;
561
	while (handle) {
562
		if (handle->ih_branch == iref->ir_branch) {
563
			i++;
564
			xt_spinlock_lock(&handle->ih_lock);
565
			ASSERT_NS(handle->ih_cache_reference);
566
			handle->ih_cache_reference = FALSE;
567
			handle->x.ih_handle_block = hptr;
568
			handle->ih_branch = &hptr->hb_branch;
569
			xt_spinlock_unlock(&handle->ih_lock);
570
#ifndef DEBUG
571
			if (i == hptr->hb_ref_count)
572
				break;
573
#endif
574
		}
575
		handle = handle->ih_next;
576
	}
577
#ifdef DEBUG
578
	ASSERT_NS(hptr->hb_ref_count == i);
579
#endif
580
	/* {HANDLE-COUNT-USAGE}
581
	 * It is safe to modify cb_handle_count when I have the
582
	 * list lock, and I have excluded all readers!
583
	 */
584
	iref->ir_block->cb_handle_count = 0;
585
#ifdef CHECK_HANDLE_STRUCTS
586
	ic_check_handle_structs();
587
#endif
588
	ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
589
590
	return OK;
591
}
592
593
xtPublic void xt_ind_lock_handle(XTIndHandlePtr handle)
594
{
595
	xt_spinlock_lock(&handle->ih_lock);
596
}
597
598
xtPublic void xt_ind_unlock_handle(XTIndHandlePtr handle)
599
{
600
	xt_spinlock_unlock(&handle->ih_lock);
601
}
602
603
/*
604
 * -----------------------------------------------------------------------
605
 * INIT/EXIT
606
 */
607
608
/*
609
 * Initialize the disk cache.
610
 */
611
xtPublic void xt_ind_init(XTThreadPtr self, size_t cache_size)
612
{
613
	XTIndBlockPtr	block;
614
615
#ifdef XT_USE_MYSYS
616
	init_key_cache(&my_cache, 1024, cache_size, 100, 300);
617
#endif
618
	/* Memory is devoted to the page data alone, I no longer count the size of the directory,
619
	 * or the page overhead: */
620
	ind_cac_globals.cg_block_count = cache_size / XT_INDEX_PAGE_SIZE;
621
	ind_cac_globals.cg_hash_size = ind_cac_globals.cg_block_count / (IDX_CAC_SEGMENT_COUNT >> 1);
622
	ind_cac_globals.cg_max_free = ind_cac_globals.cg_block_count / 10;
623
	if (ind_cac_globals.cg_max_free < 8)
624
		ind_cac_globals.cg_max_free = 8;
625
	if (ind_cac_globals.cg_max_free > 128)
626
		ind_cac_globals.cg_max_free = 128;
627
628
	try_(a) {
629
		for (u_int i=0; i<IDX_CAC_SEGMENT_COUNT; i++) {
630
			ind_cac_globals.cg_segment[i].cs_hash_table = (XTIndBlockPtr *) xt_calloc(self, ind_cac_globals.cg_hash_size * sizeof(XTIndBlockPtr));
631
			IDX_CAC_INIT_LOCK(self, &ind_cac_globals.cg_segment[i]);
632
		}
633
634
		block = (XTIndBlockPtr) xt_malloc(self, ind_cac_globals.cg_block_count * sizeof(XTIndBlockRec));
635
		ind_cac_globals.cg_blocks = block;
636
		xt_init_mutex_with_autoname(self, &ind_cac_globals.cg_lock);
637
#ifdef XT_USE_DIRECT_IO_ON_INDEX
638
		xtWord1 *buffer;
639
#ifdef XT_WIN
640
		size_t	psize = 512;
641
#else
642
		size_t	psize = getpagesize();
643
#endif
644
		size_t	diff;
645
646
		buffer = (xtWord1 *) xt_malloc(self, (ind_cac_globals.cg_block_count * XT_INDEX_PAGE_SIZE));
647
		diff = (size_t) buffer % psize;
648
		if (diff != 0) {
649
			xt_free(self, buffer);
650
			buffer = (xtWord1 *) xt_malloc(self, (ind_cac_globals.cg_block_count * XT_INDEX_PAGE_SIZE) + psize);
651
			diff = (size_t) buffer % psize;
652
			if (diff != 0)
653
				diff = psize - diff;
654
		}
655
		ind_cac_globals.cg_buffer = buffer;
656
		buffer += diff;
657
#endif
658
659
		for (u_int i=0; i<ind_cac_globals.cg_block_count; i++) {
660
			XT_IPAGE_INIT_LOCK(self, &block->cb_lock);
661
			block->cb_state = IDX_CAC_BLOCK_FREE;
662
			block->cb_next = ind_cac_globals.cg_free_list;
663
#ifdef XT_USE_DIRECT_IO_ON_INDEX
664
			block->cb_data = buffer;
665
			buffer += XT_INDEX_PAGE_SIZE;
666
#endif
1753.3.1 by Paul McCullagh
Merged with 1.1 trunk
667
#ifdef CHECK_BLOCK_TRAILERS
668
			XT_SET_DISK_4(block->cp_check, 0xDEADBEEF);
669
#endif
1455.3.1 by Vladimir Kolesnikov
lp:drizzle + pbxt 1.1 + test results
670
			ind_cac_globals.cg_free_list = block;
671
			block++;
672
		}
673
		ind_cac_globals.cg_free_count = ind_cac_globals.cg_block_count;
674
#ifdef DEBUG_CHECK_IND_CACHE
675
		ind_cac_globals.cg_reserved_by_ots = 0;
676
#endif
677
		ind_handle_init(self);
678
	}
679
	catch_(a) {
680
		xt_ind_exit(self);
681
		throw_();
682
	}
683
	cont_(a);
684
}
685
1753.3.1 by Paul McCullagh
Merged with 1.1 trunk
686
#ifdef CHECK_BLOCK_TRAILERS
687
xtPublic void check_block_trailers()
688
{
689
	XTIndBlockPtr	block;
690
691
	block = ind_cac_globals.cg_blocks;
692
	for (u_int i=0; i<ind_cac_globals.cg_block_count; i++) {
693
		ASSERT_NS(XT_GET_DISK_4(block->cp_check) == 0xDEADBEEF);
694
		block++;
695
	}
696
}
697
#endif
698
1455.3.1 by Vladimir Kolesnikov
lp:drizzle + pbxt 1.1 + test results
699
xtPublic void xt_ind_exit(XTThreadPtr self)
700
{
701
#ifdef XT_USE_MYSYS
702
	end_key_cache(&my_cache, 1);
703
#endif
704
	for (u_int i=0; i<IDX_CAC_SEGMENT_COUNT; i++) {
705
		if (ind_cac_globals.cg_segment[i].cs_hash_table) {
706
			xt_free(self, ind_cac_globals.cg_segment[i].cs_hash_table);
707
			ind_cac_globals.cg_segment[i].cs_hash_table = NULL;
708
			IDX_CAC_FREE_LOCK(self, &ind_cac_globals.cg_segment[i]);
709
		}
710
	}
711
712
	/* Must be done before freeing the blocks! */
713
	ind_handle_exit(self);
714
715
	if (ind_cac_globals.cg_blocks) {
716
		xt_free(self, ind_cac_globals.cg_blocks);
717
		ind_cac_globals.cg_blocks = NULL;
718
		xt_free_mutex(&ind_cac_globals.cg_lock);
719
	}
720
#ifdef XT_USE_DIRECT_IO_ON_INDEX
721
	if (ind_cac_globals.cg_buffer) {
722
		xt_free(self, ind_cac_globals.cg_buffer);
723
		ind_cac_globals.cg_buffer = NULL;
724
	}
725
#endif
726
727
	memset(&ind_cac_globals, 0, sizeof(ind_cac_globals));
728
}
729
730
xtPublic xtInt8 xt_ind_get_usage()
731
{
732
	xtInt8 size = 0;
733
734
	size = (xtInt8) (ind_cac_globals.cg_block_count - ind_cac_globals.cg_free_count) * (xtInt8) XT_INDEX_PAGE_SIZE;
735
	return size;
736
}
737
738
xtPublic xtInt8 xt_ind_get_size()
739
{
740
	xtInt8 size = 0;
741
742
	size = (xtInt8) ind_cac_globals.cg_block_count * (xtInt8) XT_INDEX_PAGE_SIZE;
743
	return size;
744
}
745
746
xtPublic u_int xt_ind_get_blocks()
747
{
748
	return ind_cac_globals.cg_block_count;
749
}
750
751
/*
752
 * -----------------------------------------------------------------------
753
 * INDEX CHECKING
754
 */
755
756
xtPublic void xt_ind_check_cache(XTIndexPtr ind)
757
{
758
	XTIndBlockPtr	block;
759
	u_int			free_count, inuse_count, clean_count;
760
	xtBool			check_count = FALSE;
761
762
	if (ind == (XTIndex *) 1) {
763
		ind = NULL;
764
		check_count = TRUE;
765
	}
766
767
	// Check the dirty list:
768
	if (ind) {
769
		u_int cnt = 0;
770
771
		block = ind->mi_dirty_list;
772
		while (block) {
773
			cnt++;
774
			ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_DIRTY);
775
			block = block->cb_dirty_next;
776
		}
777
		ASSERT_NS(ind->mi_dirty_blocks == cnt);
778
	}
779
780
	xt_lock_mutex_ns(&ind_cac_globals.cg_lock);
781
782
	// Check the free list:
783
	free_count = 0;
784
	block = ind_cac_globals.cg_free_list;
785
	while (block) {
786
		free_count++;
787
		ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_FREE);
788
		block = block->cb_next;
789
	}
790
	ASSERT_NS(ind_cac_globals.cg_free_count == free_count);
791
792
	/* Check the LRU list: */
793
	XTIndBlockPtr list_block, plist_block;
794
	
795
	plist_block = NULL;
796
	list_block = ind_cac_globals.cg_lru_block;
797
	if (list_block) {
798
		ASSERT_NS(ind_cac_globals.cg_mru_block != NULL);
799
		ASSERT_NS(ind_cac_globals.cg_mru_block->cb_mr_used == NULL);
800
		ASSERT_NS(list_block->cb_lr_used == NULL);
801
		inuse_count = 0;
802
		clean_count = 0;
803
		while (list_block) {
804
			inuse_count++;
805
			ASSERT_NS(IDX_CAC_NOT_FREE(list_block->cb_state));
806
			if (list_block->cb_state == IDX_CAC_BLOCK_CLEAN)
807
				clean_count++;
808
			ASSERT_NS(block != list_block);
809
			ASSERT_NS(list_block->cb_lr_used == plist_block);
810
			plist_block = list_block;
811
			list_block = list_block->cb_mr_used;
812
		}
813
		ASSERT_NS(ind_cac_globals.cg_mru_block == plist_block);
814
	}
815
	else {
816
		inuse_count = 0;
817
		clean_count = 0;
818
		ASSERT_NS(ind_cac_globals.cg_mru_block == NULL);
819
	}
820
821
#ifdef DEBUG_CHECK_IND_CACHE
822
	ASSERT_NS(free_count + inuse_count + ind_cac_globals.cg_reserved_by_ots + ind_cac_globals.cg_read_count == ind_cac_globals.cg_block_count);
823
#endif
824
	xt_unlock_mutex_ns(&ind_cac_globals.cg_lock);
825
	if (check_count) {
826
		/* We have just flushed, check how much is now free/clean. */
827
		if (free_count + clean_count < 10) {
828
			/* This could be a problem: */
829
			printf("Cache very low!\n");
830
		}
831
	}
832
}
833
834
/*
835
 * -----------------------------------------------------------------------
836
 * FREEING INDEX CACHE
837
 */
838
839
/*
840
 * This function return TRUE if the block is freed. 
841
 * This function returns FALSE if the block cannot be found, or the
842
 * block is not clean.
843
 *
844
 * We also return FALSE if we cannot copy the block to the handle
845
 * (if this is required). This will be due to out-of-memory!
846
 */
847
static xtBool ind_free_block(XTOpenTablePtr ot, XTIndBlockPtr block)
848
{
849
	XTIndBlockPtr	xblock, pxblock;
850
	u_int			hash_idx;
851
	u_int			file_id;
852
	xtIndexNodeID	address;
853
	DcSegmentPtr	seg;
1510.1.2 by Paul McCullagh
Improved changes back from 1.1 trunk
854
855
	(void) ot; /*DRIZZLED*/
1455.3.1 by Vladimir Kolesnikov
lp:drizzle + pbxt 1.1 + test results
856
857
#ifdef DEBUG_CHECK_IND_CACHE
858
	xt_ind_check_cache(NULL);
859
#endif
860
	file_id = block->cb_file_id;
861
	address = block->cb_address;
862
863
	hash_idx = XT_NODE_ID(address) + (file_id * 223);
864
	seg = &ind_cac_globals.cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
865
	hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % ind_cac_globals.cg_hash_size;
866
867
	IDX_CAC_WRITE_LOCK(seg, ot->ot_thread);
868
869
	pxblock = NULL;
870
	xblock = seg->cs_hash_table[hash_idx];
871
	while (xblock) {
872
		if (block == xblock) {
873
			/* Found the block... */
874
			/* It is possible that a thread enters this code holding a
875
			 * lock on a page. This can cause a deadlock:
876
			 *
877
			 * #0	0x91faa2ce in semaphore_wait_signal_trap
878
			 * #1	0x91fb1da5 in pthread_mutex_lock
879
			 * #2	0x00e2ec13 in xt_p_mutex_lock at pthread_xt.cc:544
880
			 * #3	0x00e6c30a in xt_xsmutex_xlock at lock_xt.cc:1547
881
			 * #4	0x00dee402 in ind_free_block at cache_xt.cc:879
882
			 * #5	0x00dee76a in ind_cac_free_lru_blocks at cache_xt.cc:1033
883
			 * #6	0x00def8d1 in xt_ind_reserve at cache_xt.cc:1513
884
			 * #7	0x00e22118 in xt_idx_insert at index_xt.cc:2047
885
			 * #8	0x00e4d7ee in xt_tab_new_record at table_xt.cc:4702
886
			 * #9	0x00e0ff0b in ha_pbxt::write_row at ha_pbxt.cc:2340
887
			 * #10	0x0023a00f in handler::ha_write_row at handler.cc:4570
888
			 * #11	0x001a32c8 in write_record at sql_insert.cc:1568
889
			 * #12	0x001ab635 in mysql_insert at sql_insert.cc:812
890
			 * #13	0x0010e068 in mysql_execute_command at sql_parse.cc:3066
891
			 * #14	0x0011480d in mysql_parse at sql_parse.cc:5787
892
			 * #15	0x00115afb in dispatch_command at sql_parse.cc:1200
893
			 * #16	0x00116de2 in do_command at sql_parse.cc:857
894
			 * #17	0x00101ee4 in handle_one_connection at sql_connect.cc:1115
895
			 * #18	0x91fdb155 in _pthread_start
896
			 * #19	0x91fdb012 in thread_start
897
			 * 
898
			 * #0	0x91fb146e in __semwait_signal
899
			 * #1	0x91fb12ef in nanosleep$UNIX2003
900
			 * #2	0x91fb1236 in usleep$UNIX2003
901
			 * #3	0x00e52112 in xt_yield at thread_xt.cc:1274
902
			 * #4	0x00e6c0eb in xt_spinxslock_xlock at lock_xt.cc:1456
903
			 * #5	0x00dee444 in ind_free_block at cache_xt.cc:886
904
			 * #6	0x00dee76a in ind_cac_free_lru_blocks at cache_xt.cc:1033
905
			 * #7	0x00deeaf0 in ind_cac_fetch at cache_xt.cc:1130
906
			 * #8	0x00def604 in xt_ind_fetch at cache_xt.cc:1386
907
			 * #9	0x00e2159a in xt_idx_update_row_id at index_xt.cc:2489
908
			 * #10	0x00e603c8 in xn_sw_clean_indices at xaction_xt.cc:1932
909
			 * #11	0x00e606d4 in xn_sw_cleanup_variation at xaction_xt.cc:2056
910
			 * #12	0x00e60e29 in xn_sw_cleanup_xact at xaction_xt.cc:2276
911
			 * #13	0x00e615ed in xn_sw_main at xaction_xt.cc:2433
912
			 * #14	0x00e61919 in xn_sw_run_thread at xaction_xt.cc:2564
913
			 * #15	0x00e53f80 in thr_main at thread_xt.cc:1017
914
			 * #16	0x91fdb155 in _pthread_start
915
			 * #17	0x91fdb012 in thread_start
916
			 *
917
			 * So we back off if a lock is held!
918
			 */
919
			if (!XT_IPAGE_WRITE_TRY_LOCK(&block->cb_lock, ot->ot_thread->t_id)) {
920
				IDX_CAC_UNLOCK(seg, ot->ot_thread);
921
#ifdef DEBUG_CHECK_IND_CACHE
922
				xt_ind_check_cache(NULL);
923
#endif
924
				return FALSE;
925
			}
926
			if (block->cb_state != IDX_CAC_BLOCK_CLEAN) {
927
				/* This block cannot be freeed: */
928
				XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
929
				IDX_CAC_UNLOCK(seg, ot->ot_thread);
930
#ifdef DEBUG_CHECK_IND_CACHE
931
				xt_ind_check_cache(NULL);
932
#endif
933
				return FALSE;
934
			}
935
			
936
			goto free_the_block;
937
		}
938
		pxblock = xblock;
939
		xblock = xblock->cb_next;
940
	}
941
942
	IDX_CAC_UNLOCK(seg, ot->ot_thread);
943
944
	/* Not found (this can happen, if block was freed by another thread) */
945
#ifdef DEBUG_CHECK_IND_CACHE
946
	xt_ind_check_cache(NULL);
947
#endif
948
	return FALSE;
949
950
	free_the_block:
951
952
	/* If the block is reference by a handle, then we
953
	 * have to copy the data to the handle before we
954
	 * free the page:
955
	 */
956
	/* {HANDLE-COUNT-USAGE}
957
	 * This access is safe because:
958
	 *
959
	 * We have an Xlock on the cache block, which excludes
960
	 * all other writers that want to change the cache block
961
	 * and also all readers of the cache block, because
962
	 * they all have at least an Slock on the cache block.
963
	 */
964
	if (block->cb_handle_count) {
965
		XTIndReferenceRec	iref;
966
		
967
		iref.ir_xlock = TRUE;
968
		iref.ir_updated = FALSE;
969
		iref.ir_block = block;
970
		iref.ir_branch = (XTIdxBranchDPtr) block->cb_data;
971
		if (!xt_ind_copy_on_write(&iref)) {
972
			XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
973
			return FALSE;
974
		}
975
	}
976
977
	/* Block is clean, remove from the hash table: */
978
	if (pxblock)
979
		pxblock->cb_next = block->cb_next;
980
	else
981
		seg->cs_hash_table[hash_idx] = block->cb_next;
982
983
	xt_lock_mutex_ns(&ind_cac_globals.cg_lock);
984
985
	/* Remove from the MRU list: */
986
	if (ind_cac_globals.cg_lru_block == block)
987
		ind_cac_globals.cg_lru_block = block->cb_mr_used;
988
	if (ind_cac_globals.cg_mru_block == block)
989
		ind_cac_globals.cg_mru_block = block->cb_lr_used;
990
	
991
	/* Note, I am updating blocks for which I have no lock
992
	 * here. But I think this is OK because I have a lock
993
	 * for the MRU list.
994
	 */
995
	if (block->cb_lr_used)
996
		block->cb_lr_used->cb_mr_used = block->cb_mr_used;
997
	if (block->cb_mr_used)
998
		block->cb_mr_used->cb_lr_used = block->cb_lr_used;
999
1000
	/* The block is now free: */
1001
	block->cb_next = ind_cac_globals.cg_free_list;
1002
	ind_cac_globals.cg_free_list = block;
1003
	ind_cac_globals.cg_free_count++;
1004
	ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN);
1005
	block->cb_state = IDX_CAC_BLOCK_FREE;
1006
	IDX_TRACE("%d- f%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(block->cb_data));
1007
1008
	/* Unlock BEFORE the block is reused! */
1009
	XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
1010
1011
	xt_unlock_mutex_ns(&ind_cac_globals.cg_lock);
1012
1013
	IDX_CAC_UNLOCK(seg, ot->ot_thread);
1014
1015
#ifdef DEBUG_CHECK_IND_CACHE
1016
	xt_ind_check_cache(NULL);
1017
#endif
1018
	return TRUE;
1019
}
1020
1021
#define IND_CACHE_MAX_BLOCKS_TO_FREE		100
1022
1023
/*
1024
 * Return the number of blocks freed.
1025
 *
1026
 * The idea is to grab a list of blocks to free.
1027
 * The list consists of the LRU blocks that are
1028
 * clean.
1029
 *
1030
 * Free as many as possible (up to max of blocks_required)
1031
 * from the list, even if LRU position has changed
1032
 * (or we have a race if there are too few blocks).
1033
 * However, if the block cannot be found, or is dirty
1034
 * we must skip it.
1035
 *
1036
 * Repeat until we find no blocks for the list, or
1037
 * we have freed 'blocks_required'.
1038
 *
1039
 * 'not_this' is a block that must not be freed because
1040
 * it is locked by the calling thread!
1041
 */
1042
static u_int ind_cac_free_lru_blocks(XTOpenTablePtr ot, u_int blocks_required, XTIdxBranchDPtr not_this)
1043
{
1044
	register DcGlobalsRec	*dcg = &ind_cac_globals;
1045
	XTIndBlockPtr			to_free[IND_CACHE_MAX_BLOCKS_TO_FREE];
1046
	int						count;
1047
	XTIndBlockPtr			block;
1048
	u_int					blocks_freed = 0;
1049
	XTIndBlockPtr			locked_block;
1050
1051
#ifdef XT_USE_DIRECT_IO_ON_INDEX
1052
#error This will not work!
1053
#endif
1054
	locked_block = (XTIndBlockPtr) ((xtWord1 *) not_this - offsetof(XTIndBlockRec, cb_data));
1055
1056
	retry:
1057
	xt_lock_mutex_ns(&ind_cac_globals.cg_lock);
1058
	block = dcg->cg_lru_block;
1059
	count = 0;
1060
	while (block && count < IND_CACHE_MAX_BLOCKS_TO_FREE) {
1061
		if (block != locked_block && block->cb_state == IDX_CAC_BLOCK_CLEAN) {
1062
			to_free[count] = block;
1063
			count++;
1064
		}
1065
		block = block->cb_mr_used;
1066
	}
1067
	xt_unlock_mutex_ns(&ind_cac_globals.cg_lock);
1068
1069
	if (!count)
1070
		return blocks_freed;
1071
1072
	for (int i=0; i<count; i++) {
1073
		if (ind_free_block(ot, to_free[i]))
1074
			blocks_freed++;
1075
		if (blocks_freed >= blocks_required &&
1076
			ind_cac_globals.cg_free_count >= ind_cac_globals.cg_max_free + blocks_required)
1077
		return blocks_freed;
1078
	}
1079
1080
	goto retry;
1081
}
1082
1083
/*
1084
 * -----------------------------------------------------------------------
1085
 * MAIN CACHE FUNCTIONS
1086
 */
1087
1088
/*
1089
 * Fetch the block. Note, if we are about to write the block
1090
 * then there is no need to read it from disk!
1091
 */
1092
static XTIndBlockPtr ind_cac_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, DcSegmentPtr *ret_seg, xtBool read_data)
1093
{
1094
	register XTOpenFilePtr	file = ot->ot_ind_file;
1095
	register XTIndBlockPtr	block, new_block;
1096
	register DcSegmentPtr	seg;
1097
	register u_int			hash_idx;
1098
	register DcGlobalsRec	*dcg = &ind_cac_globals;
1099
	size_t					red_size;
1100
1101
#ifdef DEBUG_CHECK_IND_CACHE
1102
	xt_ind_check_cache(NULL);
1103
#endif
1104
	/* Address, plus file ID multiplied by my favorite prime number! */
1105
	hash_idx = XT_NODE_ID(address) + (file->fr_id * 223);
1106
	seg = &dcg->cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
1107
	hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % dcg->cg_hash_size;
1108
1109
	IDX_CAC_READ_LOCK(seg, ot->ot_thread);
1110
	block = seg->cs_hash_table[hash_idx];
1111
	while (block) {
1112
		if (XT_NODE_ID(block->cb_address) == XT_NODE_ID(address) && block->cb_file_id == file->fr_id) {
1113
			ASSERT_NS(block->cb_state != IDX_CAC_BLOCK_FREE);
1114
1115
			/* Check how recently this page has been used: */
1116
			if (XT_TIME_DIFF(block->cb_ru_time, dcg->cg_ru_now) > (dcg->cg_block_count >> 1)) {
1117
				xt_lock_mutex_ns(&dcg->cg_lock);
1118
1119
				/* Move to the front of the MRU list: */
1120
				block->cb_ru_time = ++dcg->cg_ru_now;
1121
				if (dcg->cg_mru_block != block) {
1122
					/* Remove from the MRU list: */
1123
					if (dcg->cg_lru_block == block)
1124
						dcg->cg_lru_block = block->cb_mr_used;
1125
					if (block->cb_lr_used)
1126
						block->cb_lr_used->cb_mr_used = block->cb_mr_used;
1127
					if (block->cb_mr_used)
1128
						block->cb_mr_used->cb_lr_used = block->cb_lr_used;
1129
1130
					/* Make the block the most recently used: */
1131
					if ((block->cb_lr_used = dcg->cg_mru_block))
1132
						dcg->cg_mru_block->cb_mr_used = block;
1133
					block->cb_mr_used = NULL;
1134
					dcg->cg_mru_block = block;
1135
					if (!dcg->cg_lru_block)
1136
						dcg->cg_lru_block = block;
1137
				}
1138
1139
				xt_unlock_mutex_ns(&dcg->cg_lock);
1140
			}
1141
		
1142
			*ret_seg = seg;
1143
#ifdef DEBUG_CHECK_IND_CACHE
1144
			xt_ind_check_cache(NULL);
1145
#endif
1146
			ot->ot_thread->st_statistics.st_ind_cache_hit++;
1147
			return block;
1148
		}
1149
		block = block->cb_next;
1150
	}
1151
	
1152
	/* Block not found... */
1153
	IDX_CAC_UNLOCK(seg, ot->ot_thread);
1154
1155
	/* Check the open table reserve list first: */
1156
	if ((new_block = ot->ot_ind_res_bufs)) {
1157
		ot->ot_ind_res_bufs = new_block->cb_next;
1158
		ot->ot_ind_res_count--;
1159
#ifdef DEBUG_CHECK_IND_CACHE
1160
		xt_lock_mutex_ns(&dcg->cg_lock);
1161
		dcg->cg_reserved_by_ots--;
1162
		dcg->cg_read_count++;
1163
		xt_unlock_mutex_ns(&dcg->cg_lock);
1164
#endif
1165
		goto use_free_block;
1166
	}
1167
1168
	free_some_blocks:
1169
	if (!dcg->cg_free_list) {
1170
		if (!ind_cac_free_lru_blocks(ot, 1, NULL)) {
1171
			if (!dcg->cg_free_list) {
1172
				xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_INDEX_CACHE);
1173
#ifdef DEBUG_CHECK_IND_CACHE
1174
				xt_ind_check_cache(NULL);
1175
#endif
1176
				return NULL;
1177
			}
1178
		}
1179
	}
1180
1181
	/* Get a free block: */
1182
	xt_lock_mutex_ns(&dcg->cg_lock);
1183
	if (!(new_block = dcg->cg_free_list)) {
1184
		xt_unlock_mutex_ns(&dcg->cg_lock);
1185
		goto free_some_blocks;
1186
	}
1187
	ASSERT_NS(new_block->cb_state == IDX_CAC_BLOCK_FREE);
1188
	dcg->cg_free_list = new_block->cb_next;
1189
	dcg->cg_free_count--;
1190
#ifdef DEBUG_CHECK_IND_CACHE
1191
	dcg->cg_read_count++;
1192
#endif
1193
	xt_unlock_mutex_ns(&dcg->cg_lock);
1194
1195
	use_free_block:
1196
	new_block->cb_address = address;
1197
	new_block->cb_file_id = file->fr_id;
1198
	ASSERT_NS(new_block->cb_state == IDX_CAC_BLOCK_FREE);
1199
	new_block->cb_state = IDX_CAC_BLOCK_CLEAN;
1200
	new_block->cb_handle_count = 0;
1201
	new_block->cp_del_count = 0;
1202
	new_block->cb_dirty_next = NULL;
1203
	new_block->cb_dirty_prev = NULL;
1204
#ifdef IND_OPT_DATA_WRITTEN
1205
	new_block->cb_header = FALSE;
1206
	new_block->cb_min_pos = 0xFFFF;
1207
	new_block->cb_max_pos = 0;
1208
#endif
1209
1210
	if (read_data) {
1211
		if (!xt_pread_file(file, xt_ind_node_to_offset(ot->ot_table, address), XT_INDEX_PAGE_SIZE, 0, new_block->cb_data, &red_size, &ot->ot_thread->st_statistics.st_ind, ot->ot_thread)) {
1212
			xt_lock_mutex_ns(&dcg->cg_lock);
1213
			new_block->cb_next = dcg->cg_free_list;
1214
			dcg->cg_free_list = new_block;
1215
			dcg->cg_free_count++;
1216
#ifdef DEBUG_CHECK_IND_CACHE
1217
			dcg->cg_read_count--;
1218
#endif
1219
			ASSERT_NS(new_block->cb_state == IDX_CAC_BLOCK_CLEAN);
1220
			new_block->cb_state = IDX_CAC_BLOCK_FREE;
1221
			IDX_TRACE("%d- F%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(new_block->cb_data));
1222
			xt_unlock_mutex_ns(&dcg->cg_lock);
1223
#ifdef DEBUG_CHECK_IND_CACHE
1224
			xt_ind_check_cache(NULL);
1225
#endif
1226
			return NULL;
1227
		}
1228
		IDX_TRACE("%d- R%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(new_block->cb_data));
1229
		ot->ot_thread->st_statistics.st_ind_cache_miss++;
1230
	}
1231
	else
1232
		red_size = 0;
1233
	// PMC - I don't think this is required! memset(new_block->cb_data + red_size, 0, XT_INDEX_PAGE_SIZE - red_size);
1234
1235
	IDX_CAC_WRITE_LOCK(seg, ot->ot_thread);
1236
	block = seg->cs_hash_table[hash_idx];
1237
	while (block) {
1238
		if (XT_NODE_ID(block->cb_address) == XT_NODE_ID(address) && block->cb_file_id == file->fr_id) {
1239
			/* Oops, someone else was faster! */
1240
			xt_lock_mutex_ns(&dcg->cg_lock);
1241
			new_block->cb_next = dcg->cg_free_list;
1242
			dcg->cg_free_list = new_block;
1243
			dcg->cg_free_count++;
1244
#ifdef DEBUG_CHECK_IND_CACHE
1245
			dcg->cg_read_count--;
1246
#endif
1247
			ASSERT_NS(new_block->cb_state == IDX_CAC_BLOCK_CLEAN);
1248
			new_block->cb_state = IDX_CAC_BLOCK_FREE;
1249
			IDX_TRACE("%d- F%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(new_block->cb_data));
1250
			xt_unlock_mutex_ns(&dcg->cg_lock);
1251
			goto done_ok;
1252
		}
1253
		block = block->cb_next;
1254
	}
1255
	block = new_block;
1256
1257
	/* Make the block the most recently used: */
1258
	xt_lock_mutex_ns(&dcg->cg_lock);
1259
	block->cb_ru_time = ++dcg->cg_ru_now;
1260
	if ((block->cb_lr_used = dcg->cg_mru_block))
1261
		dcg->cg_mru_block->cb_mr_used = block;
1262
	block->cb_mr_used = NULL;
1263
	dcg->cg_mru_block = block;
1264
	if (!dcg->cg_lru_block)
1265
		dcg->cg_lru_block = block;
1266
#ifdef DEBUG_CHECK_IND_CACHE
1267
	dcg->cg_read_count--;
1268
#endif
1269
	xt_unlock_mutex_ns(&dcg->cg_lock);
1270
1271
	/* {LAZY-DEL-INDEX-ITEMS}
1272
	 * Conditionally count the number of deleted entries in the index:
1273
	 * We do this before other threads can read the block.
1274
	 */
1753.3.1 by Paul McCullagh
Merged with 1.1 trunk
1275
	if (ind && ind->mi_lazy_delete && read_data)
1455.3.1 by Vladimir Kolesnikov
lp:drizzle + pbxt 1.1 + test results
1276
		xt_ind_count_deleted_items(ot->ot_table, ind, block);
1277
1278
	/* Add to the hash table: */
1279
	block->cb_next = seg->cs_hash_table[hash_idx];
1280
	seg->cs_hash_table[hash_idx] = block;
1281
1282
	done_ok:
1283
	*ret_seg = seg;
1284
#ifdef DEBUG_CHECK_IND_CACHE
1285
	xt_ind_check_cache(NULL);
1286
#endif
1287
	return block;
1288
}
1289
1290
static xtBool ind_cac_get(XTOpenTablePtr ot, xtIndexNodeID address, DcSegmentPtr *ret_seg, XTIndBlockPtr *ret_block)
1291
{
1292
	register XTOpenFilePtr	file = ot->ot_ind_file;
1293
	register XTIndBlockPtr	block;
1294
	register DcSegmentPtr	seg;
1295
	register u_int			hash_idx;
1296
	register DcGlobalsRec	*dcg = &ind_cac_globals;
1297
1298
	hash_idx = XT_NODE_ID(address) + (file->fr_id * 223);
1299
	seg = &dcg->cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
1300
	hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % dcg->cg_hash_size;
1301
1302
	IDX_CAC_READ_LOCK(seg, ot->ot_thread);
1303
	block = seg->cs_hash_table[hash_idx];
1304
	while (block) {
1305
		if (XT_NODE_ID(block->cb_address) == XT_NODE_ID(address) && block->cb_file_id == file->fr_id) {
1306
			ASSERT_NS(block->cb_state != IDX_CAC_BLOCK_FREE);
1307
1308
			*ret_seg = seg;
1309
			*ret_block = block;
1310
			return OK;
1311
		}
1312
		block = block->cb_next;
1313
	}
1314
	IDX_CAC_UNLOCK(seg, ot->ot_thread);
1315
	
1316
	/* Block not found: */
1317
	*ret_seg = NULL;
1318
	*ret_block = NULL;
1319
	return OK;
1320
}
1321
1322
xtPublic xtBool xt_ind_write(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, size_t size, xtWord1 *data)
1323
{
1324
	XTIndBlockPtr	block;
1325
	DcSegmentPtr	seg;
1326
1327
	if (!(block = ind_cac_fetch(ot, ind, address, &seg, FALSE)))
1328
		return FAILED;
1329
1330
	XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
1331
	if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1332
		if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1333
			XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
1334
			IDX_CAC_UNLOCK(seg, ot->ot_thread);
1335
			return FAILED;
1336
		}
1337
	}
1338
#ifdef IND_OPT_DATA_WRITTEN
1339
	block->cb_header = TRUE;
1340
	block->cb_min_pos = 0;
1341
	if (size-XT_INDEX_PAGE_HEAD_SIZE > block->cb_max_pos)
1342
		block->cb_max_pos = size-XT_INDEX_PAGE_HEAD_SIZE;
1343
	ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-XT_INDEX_PAGE_HEAD_SIZE);
1344
	ASSERT_NS(block->cb_min_pos < block->cb_max_pos);
1345
#endif
1346
	ASSERT_NS(IDX_CAC_MODIFYABLE(block->cb_state));
1347
	memcpy(block->cb_data, data, size);
1348
	if (block->cb_state != IDX_CAC_BLOCK_DIRTY) {
1349
		TRACK_BLOCK_WRITE(offset);
1350
		xt_spinlock_lock(&ind->mi_dirty_lock);
1351
		if ((block->cb_dirty_next = ind->mi_dirty_list))
1352
			ind->mi_dirty_list->cb_dirty_prev = block;
1353
		block->cb_dirty_prev = NULL;
1354
		ind->mi_dirty_list = block;
1355
		ind->mi_dirty_blocks++;
1356
		xt_spinlock_unlock(&ind->mi_dirty_lock);
1357
		if (block->cb_state != IDX_CAC_BLOCK_LOGGED) {
1358
			ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN);
1359
			ot->ot_thread->st_statistics.st_ind_cache_dirty++;
1360
		}
1361
		block->cb_state = IDX_CAC_BLOCK_DIRTY;
1362
	}
1363
	XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
1364
	IDX_CAC_UNLOCK(seg, ot->ot_thread);
1365
#ifdef XT_TRACK_INDEX_UPDATES
1366
	ot->ot_ind_changed++;
1367
#endif
1753.3.1 by Paul McCullagh
Merged with 1.1 trunk
1368
#ifdef CHECK_BLOCK_TRAILERS
1369
	check_block_trailers();
1370
#endif
1455.3.1 by Vladimir Kolesnikov
lp:drizzle + pbxt 1.1 + test results
1371
	return OK;
1372
}
1373
1374
/*
1375
 * Update the cache, if in RAM.
1376
 */
1377
xtPublic xtBool xt_ind_write_cache(XTOpenTablePtr ot, xtIndexNodeID address, size_t size, xtWord1 *data)
1378
{
1379
	XTIndBlockPtr	block;
1380
	DcSegmentPtr	seg;
1381
1382
	if (!ind_cac_get(ot, address, &seg, &block))
1383
		return FAILED;
1384
1385
	if (block) {
1386
		XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
1387
		/* This should only be done to pages that are free, which
1388
		 * are not on the dirty list, so they must be clean!
1389
		 */
1390
		ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN);
1391
		memcpy(block->cb_data, data, size);
1392
1393
		XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
1394
		IDX_CAC_UNLOCK(seg, ot->ot_thread);
1395
	}
1396
1397
	return OK;
1398
}
1399
1400
xtPublic xtBool xt_ind_get(XTOpenTablePtr ot, xtIndexNodeID address, XTIndReferencePtr iref)
1401
{
1402
	XTIndBlockPtr	block;
1403
	DcSegmentPtr	seg;
1404
1405
	if (!ind_cac_get(ot, address, &seg, &block))
1406
		return FAILED;
1407
1408
	if (block) {
1409
		XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
1410
		ASSERT_NS(IDX_CAC_NOT_FREE(block->cb_state));
1411
		IDX_CAC_UNLOCK(seg, ot->ot_thread);
1412
		iref->ir_block = block;
1413
		iref->ir_branch = (XTIdxBranchDPtr) block->cb_data;
1414
	}
1415
	else {
1416
		iref->ir_block = NULL;
1417
		iref->ir_branch = NULL;
1418
	}
1419
	iref->ir_xlock = TRUE;
1420
	iref->ir_updated = FALSE;
1421
1422
	return OK;
1423
}
1424
1425
/* 
1426
 * Note, this function may only be called if the block has
1427
 * been freed.
1428
 */
1429
xtPublic xtBool xt_ind_free_block(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address)
1430
{
1431
	XTIndBlockPtr	block;
1432
	DcSegmentPtr	seg;
1433
1434
	if (!ind_cac_get(ot, address, &seg, &block))
1435
		return FAILED;
1436
	if (block) {
1437
		XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
1438
1439
		if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1440
			if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1441
				XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
1442
				IDX_CAC_UNLOCK(seg, ot->ot_thread);
1443
				return FAILED;
1444
			}
1445
		}
1446
1447
		/* {PAGE-NO-IN-INDEX-FILE}
1448
		 * This is the one exeption to the rule that a block
1449
		 * that is in the IDX_CAC_BLOCK_LOGGED may be released
1450
		 * from the cache!
1451
		 */
1452
		ASSERT_NS(IDX_CAC_MODIFYABLE(block->cb_state));
1453
1454
		if (block->cb_state == IDX_CAC_BLOCK_DIRTY) {
1455
			/* Take the block off the dirty list: */
1456
			xt_spinlock_lock(&ind->mi_dirty_lock);
1457
			if (block->cb_dirty_next)
1458
				block->cb_dirty_next->cb_dirty_prev = block->cb_dirty_prev;
1459
			if (block->cb_dirty_prev)
1460
				block->cb_dirty_prev->cb_dirty_next = block->cb_dirty_next;
1461
			if (ind->mi_dirty_list == block)
1462
				ind->mi_dirty_list = block->cb_dirty_next;
1463
			ind->mi_dirty_blocks--;
1464
			xt_spinlock_unlock(&ind->mi_dirty_lock);
1465
			block->cb_state = IDX_CAC_BLOCK_CLEAN;
1466
			ot->ot_thread->st_statistics.st_ind_cache_dirty--;
1467
#ifdef IND_OPT_DATA_WRITTEN
1468
			block->cb_header = FALSE;
1469
			block->cb_min_pos = 0xFFFF;
1470
			block->cb_max_pos = 0;
1471
#endif
1472
		}
1473
		XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
1474
1475
		IDX_CAC_UNLOCK(seg, ot->ot_thread);
1476
	}
1477
1478
	return OK;
1479
}
1480
1481
xtPublic xtBool xt_ind_read_bytes(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, size_t size, xtWord1 *data)
1482
{
1483
	XTIndBlockPtr	block;
1484
	DcSegmentPtr	seg;
1485
1486
	if (!(block = ind_cac_fetch(ot, ind, address, &seg, TRUE)))
1487
		return FAILED;
1488
1489
	XT_IPAGE_READ_LOCK(&block->cb_lock);
1490
	memcpy(data, block->cb_data, size);
1491
	XT_IPAGE_UNLOCK(&block->cb_lock, FALSE);
1492
	IDX_CAC_UNLOCK(seg, ot->ot_thread);
1493
	return OK;
1494
}
1495
1496
xtPublic xtBool xt_ind_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, XTPageLockType ltype, XTIndReferencePtr iref)
1497
{
1498
	register XTIndBlockPtr	block;
1499
	DcSegmentPtr			seg;
1500
	xtWord2					branch_size;
1501
	u_int					rec_size;
1502
	xtBool					xlock = FALSE;
1503
1504
#ifdef DEBUG
1505
	ASSERT_NS(iref->ir_xlock == 2);
1506
	ASSERT_NS(iref->ir_xlock == 2);
1507
#endif
1508
	if (!(block = ind_cac_fetch(ot, ind, address, &seg, TRUE)))
1509
		return FAILED;
1510
1511
	branch_size = XT_GET_DISK_2(((XTIdxBranchDPtr) block->cb_data)->tb_size_2);
1512
	rec_size = XT_GET_INDEX_BLOCK_LEN(branch_size);
1513
	if (rec_size < 2 || rec_size > XT_INDEX_PAGE_SIZE)
1514
		goto failed_corrupt;
1515
	if (ind->mi_fix_key) {
1516
		rec_size -= 2;
1517
		if (XT_IS_NODE(branch_size)) {
1518
			if (rec_size != 0) {
1519
				if (rec_size < XT_NODE_REF_SIZE)
1520
					goto failed_corrupt;
1521
				rec_size -= XT_NODE_REF_SIZE;
1522
				if ((rec_size % (ind->mi_key_size + XT_RECORD_REF_SIZE + XT_NODE_REF_SIZE)) != 0)
1523
					goto failed_corrupt;
1524
			}
1525
		}
1526
		else {
1527
			if ((rec_size % (ind->mi_key_size + XT_RECORD_REF_SIZE)) != 0)
1528
				goto failed_corrupt;
1529
		}
1530
	}
1531
1532
	switch (ltype) {
1533
		case XT_LOCK_READ:
1534
			break;
1535
		case XT_LOCK_WRITE:
1536
			xlock = TRUE;
1537
			break;
1538
		case XT_XLOCK_LEAF:
1539
			if (!XT_IS_NODE(branch_size))
1540
				xlock = TRUE;
1541
			break;
1542
		case XT_XLOCK_DEL_LEAF:
1543
			if (!XT_IS_NODE(branch_size)) {
1544
				if (ot->ot_table->tab_dic.dic_no_lazy_delete)
1545
					xlock = TRUE;
1546
				else {
1547
					/*
1548
					 * {LAZY-DEL-INDEX-ITEMS}
1549
					 *
1550
					 * We are fetch a page for delete purpose.
1551
					 * we decide here if we plan to do a lazy delete,
1552
					 * Or if we plan to compact the node.
1553
					 *
1554
					 * A lazy delete just requires a shared lock.
1555
					 *
1556
					 */
1557
					if (ind->mi_lazy_delete) {
1558
						/* If the number of deleted items is greater than
1559
						 * half of the number of times that can fit in the
1560
						 * page, then we will compact the node.
1561
						 */
1562
						if (!xt_idx_lazy_delete_on_leaf(ind, block, XT_GET_INDEX_BLOCK_LEN(branch_size)))
1563
							xlock = TRUE;
1564
					}
1565
					else
1566
						xlock = TRUE;
1567
				}
1568
			}
1569
			break;
1570
	}
1571
1572
	if ((iref->ir_xlock = xlock))
1573
		XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
1574
	else
1575
		XT_IPAGE_READ_LOCK(&block->cb_lock);
1576
1577
	IDX_CAC_UNLOCK(seg, ot->ot_thread);
1578
1579
	/* {DIRECT-IO}
1580
	 * Direct I/O requires that the buffer is 512 byte aligned.
1581
	 * To do this, cb_data is turned into a pointer, instead
1582
	 * of an array.
1583
	 * As a result, we need to pass a pointer to both the
1584
	 * cache block and the cache block data:
1585
	 */
1586
	iref->ir_updated = FALSE;
1587
	iref->ir_block = block;
1588
	iref->ir_branch = (XTIdxBranchDPtr) block->cb_data;
1589
	return OK;
1590
1591
	failed_corrupt:
1592
	IDX_CAC_UNLOCK(seg, ot->ot_thread);
1593
	xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
1594
	return FAILED;
1595
}
1596
1597
xtPublic xtBool xt_ind_release(XTOpenTablePtr ot, XTIndexPtr ind, XTPageUnlockType XT_NDEBUG_UNUSED(utype), XTIndReferencePtr iref)
1598
{
1599
	register XTIndBlockPtr	block;
1600
1601
	block = iref->ir_block;
1602
1603
#ifdef DEBUG
1604
	ASSERT_NS(iref->ir_xlock != 2);
1605
	ASSERT_NS(iref->ir_updated != 2);
1606
	if (iref->ir_updated)
1607
		ASSERT_NS(utype == XT_UNLOCK_R_UPDATE || utype == XT_UNLOCK_W_UPDATE);
1608
	else
1609
		ASSERT_NS(utype == XT_UNLOCK_READ || utype == XT_UNLOCK_WRITE);
1610
	if (iref->ir_xlock)
1611
		ASSERT_NS(utype == XT_UNLOCK_WRITE || utype == XT_UNLOCK_W_UPDATE);
1612
	else
1613
		ASSERT_NS(utype == XT_UNLOCK_READ || utype == XT_UNLOCK_R_UPDATE);
1614
#endif
1615
	if (iref->ir_updated) {
1616
#ifdef DEBUG
1617
#ifdef IND_OPT_DATA_WRITTEN
1618
		xtWord2	branch_size;
1619
		u_int	rec_size;
1620
1621
		branch_size = XT_GET_DISK_2(((XTIdxBranchDPtr) block->cb_data)->tb_size_2);
1622
		rec_size = XT_GET_INDEX_BLOCK_LEN(branch_size);
1623
1624
		ASSERT_NS(block->cb_min_pos <= rec_size-2);
1625
		ASSERT_NS(block->cb_min_pos <= block->cb_max_pos);
1626
		ASSERT_NS(block->cb_max_pos <= rec_size-2);
1627
		ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1628
#endif
1629
#endif
1630
		/* The page was update: */
1631
		ASSERT_NS(IDX_CAC_MODIFYABLE(block->cb_state));
1632
		if (block->cb_state != IDX_CAC_BLOCK_DIRTY) {
1633
			TRACK_BLOCK_WRITE(offset);
1634
			xt_spinlock_lock(&ind->mi_dirty_lock);
1635
			if ((block->cb_dirty_next = ind->mi_dirty_list))
1636
				ind->mi_dirty_list->cb_dirty_prev = block;
1637
			block->cb_dirty_prev = NULL;
1638
			ind->mi_dirty_list = block;
1639
			ind->mi_dirty_blocks++;
1640
			xt_spinlock_unlock(&ind->mi_dirty_lock);
1641
			if (block->cb_state != IDX_CAC_BLOCK_LOGGED) {
1642
				ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN);
1643
				ot->ot_thread->st_statistics.st_ind_cache_dirty++;
1644
			}
1645
			block->cb_state = IDX_CAC_BLOCK_DIRTY;
1646
		}
1647
	}
1648
1649
	XT_IPAGE_UNLOCK(&block->cb_lock, iref->ir_xlock);
1650
#ifdef DEBUG
1651
	iref->ir_xlock = 2;
1652
	iref->ir_updated = 2;
1653
#endif
1654
	return OK;
1655
}
1656
1657
xtPublic xtBool xt_ind_reserve(XTOpenTablePtr ot, u_int count, XTIdxBranchDPtr not_this)
1658
{
1659
	register XTIndBlockPtr	block;
1660
	register DcGlobalsRec	*dcg = &ind_cac_globals;
1661
1662
#ifdef XT_TRACK_INDEX_UPDATES
1663
	ot->ot_ind_reserved = count;
1664
	ot->ot_ind_reads = 0;
1665
#endif
1666
#ifdef DEBUG_CHECK_IND_CACHE
1667
	xt_ind_check_cache(NULL);
1668
#endif
1669
	while (ot->ot_ind_res_count < count) {
1670
		if (!dcg->cg_free_list) {
1671
			if (!ind_cac_free_lru_blocks(ot, count - ot->ot_ind_res_count, not_this)) {
1672
				if (!dcg->cg_free_list) {
1673
					xt_ind_free_reserved(ot);
1674
					xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_INDEX_CACHE);
1675
#ifdef DEBUG_CHECK_IND_CACHE
1676
					xt_ind_check_cache(NULL);
1677
#endif
1678
					return FAILED;
1679
				}
1680
			}
1681
		}
1682
1683
		/* Get a free block: */
1684
		xt_lock_mutex_ns(&dcg->cg_lock);
1685
		while (ot->ot_ind_res_count < count && (block = dcg->cg_free_list)) {
1686
			ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_FREE);
1687
			dcg->cg_free_list = block->cb_next;
1688
			dcg->cg_free_count--;
1689
			block->cb_next = ot->ot_ind_res_bufs;
1690
			ot->ot_ind_res_bufs = block;
1691
			ot->ot_ind_res_count++;
1692
#ifdef DEBUG_CHECK_IND_CACHE
1693
			dcg->cg_reserved_by_ots++;
1694
#endif
1695
		}
1696
		xt_unlock_mutex_ns(&dcg->cg_lock);
1697
	}
1698
#ifdef DEBUG_CHECK_IND_CACHE
1699
	xt_ind_check_cache(NULL);
1700
#endif
1701
	return OK;
1702
}
1703
1704
xtPublic void xt_ind_free_reserved(XTOpenTablePtr ot)
1705
{
1706
#ifdef DEBUG_CHECK_IND_CACHE
1707
	xt_ind_check_cache(NULL);
1708
#endif
1709
	if (ot->ot_ind_res_bufs) {
1710
		register XTIndBlockPtr	block, fblock;
1711
		register DcGlobalsRec	*dcg = &ind_cac_globals;
1712
1713
		xt_lock_mutex_ns(&dcg->cg_lock);
1714
		block = ot->ot_ind_res_bufs;
1715
		while (block) {
1716
			fblock = block;
1717
			block = block->cb_next;
1718
1719
			fblock->cb_next = dcg->cg_free_list;
1720
			dcg->cg_free_list = fblock;
1721
#ifdef DEBUG_CHECK_IND_CACHE
1722
			dcg->cg_reserved_by_ots--;
1723
#endif
1724
			dcg->cg_free_count++;
1725
		}
1726
		xt_unlock_mutex_ns(&dcg->cg_lock);
1727
		ot->ot_ind_res_bufs = NULL;
1728
		ot->ot_ind_res_count = 0;
1729
	}
1730
#ifdef DEBUG_CHECK_IND_CACHE
1731
	xt_ind_check_cache(NULL);
1732
#endif
1733
}
1734
1735
xtPublic void xt_ind_unreserve(XTOpenTablePtr ot)
1736
{
1737
	if (!ind_cac_globals.cg_free_list)
1738
		xt_ind_free_reserved(ot);
1739
}
1740