~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/cache_xt.cc

Renamed more stuff to drizzle.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2005 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase XT
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
 
 *
19
 
 * 2005-05-24   Paul McCullagh
20
 
 *
21
 
 * H&G2JCtL
22
 
 */
23
 
 
24
 
#include "xt_config.h"
25
 
 
26
 
#ifdef DRIZZLED
27
 
#include <bitset>
28
 
#endif
29
 
 
30
 
#ifndef XT_WIN
31
 
#include <unistd.h>
32
 
#endif
33
 
 
34
 
#include <stdio.h>
35
 
#include <time.h>
36
 
 
37
 
#include "pthread_xt.h"
38
 
#include "thread_xt.h"
39
 
#include "filesys_xt.h"
40
 
#include "cache_xt.h"
41
 
#include "table_xt.h"
42
 
#include "trace_xt.h"
43
 
#include "util_xt.h"
44
 
 
45
 
#define XT_TIME_DIFF(start, now) (\
46
 
        ((xtWord4) (now) < (xtWord4) (start)) ? \
47
 
        ((xtWord4) 0XFFFFFFFF - ((xtWord4) (start) - (xtWord4) (now))) : \
48
 
        ((xtWord4) (now) - (xtWord4) (start)))
49
 
 
50
 
/*
51
 
 * -----------------------------------------------------------------------
52
 
 * D I S K   C A C H E
53
 
 */
54
 
 
55
 
#define IDX_CAC_SEGMENT_COUNT           ((off_t) 1 << XT_INDEX_CACHE_SEGMENT_SHIFTS)
56
 
#define IDX_CAC_SEGMENT_MASK            (IDX_CAC_SEGMENT_COUNT - 1)
57
 
 
58
 
#ifdef XT_NO_ATOMICS
59
 
#define IDX_CAC_USE_PTHREAD_RW
60
 
#else
61
 
//#define IDX_CAC_USE_PTHREAD_RW
62
 
#define IDX_CAC_USE_XSMUTEX
63
 
//#define IDX_USE_SPINXSLOCK
64
 
#endif
65
 
 
66
 
#if defined(IDX_CAC_USE_PTHREAD_RW)
67
 
#define IDX_CAC_LOCK_TYPE                               xt_rwlock_type
68
 
#define IDX_CAC_INIT_LOCK(s, i)                 xt_init_rwlock_with_autoname(s, &(i)->cs_lock)
69
 
#define IDX_CAC_FREE_LOCK(s, i)                 xt_free_rwlock(&(i)->cs_lock)   
70
 
#define IDX_CAC_READ_LOCK(i, o)                 xt_slock_rwlock_ns(&(i)->cs_lock)
71
 
#define IDX_CAC_WRITE_LOCK(i, o)                xt_xlock_rwlock_ns(&(i)->cs_lock)
72
 
#define IDX_CAC_UNLOCK(i, o)                    xt_unlock_rwlock_ns(&(i)->cs_lock)
73
 
#elif defined(IDX_CAC_USE_XSMUTEX)
74
 
#define IDX_CAC_LOCK_TYPE                               XTMutexXSLockRec
75
 
#define IDX_CAC_INIT_LOCK(s, i)                 xt_xsmutex_init_with_autoname(s, &(i)->cs_lock)
76
 
#define IDX_CAC_FREE_LOCK(s, i)                 xt_xsmutex_free(s, &(i)->cs_lock)       
77
 
#define IDX_CAC_READ_LOCK(i, o)                 xt_xsmutex_slock(&(i)->cs_lock, (o)->t_id)
78
 
#define IDX_CAC_WRITE_LOCK(i, o)                xt_xsmutex_xlock(&(i)->cs_lock, (o)->t_id)
79
 
#define IDX_CAC_UNLOCK(i, o)                    xt_xsmutex_unlock(&(i)->cs_lock, (o)->t_id)
80
 
#elif defined(IDX_CAC_USE_SPINXSLOCK)
81
 
#define IDX_CAC_LOCK_TYPE                               XTSpinXSLockRec
82
 
#define IDX_CAC_INIT_LOCK(s, i)                 xt_spinxslock_init_with_autoname(s, &(i)->cs_lock)
83
 
#define IDX_CAC_FREE_LOCK(s, i)                 xt_spinxslock_free(s, &(i)->cs_lock)    
84
 
#define IDX_CAC_READ_LOCK(i, s)                 xt_spinxslock_slock(&(i)->cs_lock, (s)->t_id)
85
 
#define IDX_CAC_WRITE_LOCK(i, s)                xt_spinxslock_xlock(&(i)->cs_lock, FALSE, (s)->t_id)
86
 
#define IDX_CAC_UNLOCK(i, s)                    xt_spinxslock_unlock(&(i)->cs_lock, (s)->t_id)
87
 
#else
88
 
#error Please define the lock type
89
 
#endif
90
 
 
91
 
#ifdef XT_NO_ATOMICS
92
 
#define ID_HANDLE_USE_PTHREAD_RW
93
 
#else
94
 
//#define ID_HANDLE_USE_PTHREAD_RW
95
 
#define ID_HANDLE_USE_SPINLOCK
96
 
#endif
97
 
 
98
 
#if defined(ID_HANDLE_USE_PTHREAD_RW)
99
 
#define ID_HANDLE_LOCK_TYPE                             xt_mutex_type
100
 
#define ID_HANDLE_INIT_LOCK(s, i)               xt_init_mutex_with_autoname(s, i)
101
 
#define ID_HANDLE_FREE_LOCK(s, i)               xt_free_mutex(i)        
102
 
#define ID_HANDLE_LOCK(i)                               xt_lock_mutex_ns(i)
103
 
#define ID_HANDLE_UNLOCK(i)                             xt_unlock_mutex_ns(i)
104
 
#elif defined(ID_HANDLE_USE_SPINLOCK)
105
 
#define ID_HANDLE_LOCK_TYPE                             XTSpinLockRec
106
 
#define ID_HANDLE_INIT_LOCK(s, i)               xt_spinlock_init_with_autoname(s, i)
107
 
#define ID_HANDLE_FREE_LOCK(s, i)               xt_spinlock_free(s, i)  
108
 
#define ID_HANDLE_LOCK(i)                               xt_spinlock_lock(i)
109
 
#define ID_HANDLE_UNLOCK(i)                             xt_spinlock_unlock(i)
110
 
#endif
111
 
 
112
 
#define XT_HANDLE_SLOTS                                 37
113
 
 
114
 
/*
115
 
#ifdef DEBUG
116
 
#define XT_INIT_HANDLE_COUNT                    0
117
 
#define XT_INIT_HANDLE_BLOCKS                   0
118
 
#else
119
 
#define XT_INIT_HANDLE_COUNT                    40
120
 
#define XT_INIT_HANDLE_BLOCKS                   10
121
 
#endif
122
 
*/
123
 
 
124
 
/* A disk cache segment. The cache is divided into a number of segments
125
 
 * to improve concurrency.
126
 
 */
127
 
typedef struct DcSegment {
128
 
        IDX_CAC_LOCK_TYPE       cs_lock;                                                /* The cache segment lock. */
129
 
        XTIndBlockPtr           *cs_hash_table;
130
 
} DcSegmentRec, *DcSegmentPtr;
131
 
 
132
 
typedef struct DcHandleSlot {
133
 
        ID_HANDLE_LOCK_TYPE     hs_handles_lock;
134
 
        XTIndHandleBlockPtr     hs_free_blocks;
135
 
        XTIndHandlePtr          hs_free_handles;
136
 
        XTIndHandlePtr          hs_used_handles;
137
 
} DcHandleSlotRec, *DcHandleSlotPtr;
138
 
 
139
 
typedef struct DcGlobals {
140
 
        xt_mutex_type           cg_lock;                                                /* The public cache lock. */
141
 
        DcSegmentRec            cg_segment[IDX_CAC_SEGMENT_COUNT];
142
 
        XTIndBlockPtr           cg_blocks;
143
 
#ifdef XT_USE_DIRECT_IO_ON_INDEX
144
 
        xtWord1                         *cg_buffer;
145
 
#endif
146
 
        XTIndBlockPtr           cg_free_list;
147
 
        xtWord4                         cg_free_count;
148
 
        xtWord4                         cg_ru_now;                                              /* A counter as described by Jim Starkey (my thanks) */
149
 
        XTIndBlockPtr           cg_lru_block;
150
 
        XTIndBlockPtr           cg_mru_block;
151
 
        xtWord4                         cg_hash_size;
152
 
        xtWord4                         cg_block_count;
153
 
        xtWord4                         cg_max_free;
154
 
#ifdef DEBUG_CHECK_IND_CACHE
155
 
        u_int                           cg_reserved_by_ots;                             /* Number of blocks reserved by open tables. */
156
 
        u_int                           cg_read_count;                                  /* Number of blocks being read. */
157
 
#endif
158
 
 
159
 
        /* Index cache handles: */
160
 
        DcHandleSlotRec         cg_handle_slot[XT_HANDLE_SLOTS];
161
 
} DcGlobalsRec;
162
 
 
163
 
static DcGlobalsRec     ind_cac_globals;
164
 
 
165
 
#ifdef XT_USE_MYSYS
166
 
#ifdef xtPublic
167
 
#undef xtPublic
168
 
#endif
169
 
#include "my_global.h"
170
 
#include "my_sys.h"
171
 
#include "keycache.h"
172
 
KEY_CACHE my_cache;
173
 
#undef  pthread_rwlock_rdlock
174
 
#undef  pthread_rwlock_wrlock
175
 
#undef  pthread_rwlock_try_wrlock
176
 
#undef  pthread_rwlock_unlock
177
 
#undef  pthread_mutex_lock
178
 
#undef  pthread_mutex_unlock
179
 
#undef  pthread_cond_wait
180
 
#undef  pthread_cond_broadcast
181
 
#undef  xt_mutex_type
182
 
#define xtPublic
183
 
#endif
184
 
 
185
 
/*
186
 
 * -----------------------------------------------------------------------
187
 
 * INDEX CACHE HANDLES
188
 
 */
189
 
 
190
 
static XTIndHandlePtr ind_alloc_handle()
191
 
{
192
 
        XTIndHandlePtr handle;
193
 
 
194
 
        if (!(handle = (XTIndHandlePtr) xt_calloc_ns(sizeof(XTIndHandleRec))))
195
 
                return NULL;
196
 
        xt_spinlock_init_with_autoname(NULL, &handle->ih_lock);
197
 
        return handle;
198
 
}
199
 
 
200
 
static void ind_free_handle(XTIndHandlePtr handle)
201
 
{
202
 
        xt_spinlock_free(NULL, &handle->ih_lock);
203
 
        xt_free_ns(handle);
204
 
}
205
 
 
206
 
static void ind_handle_exit(XTThreadPtr self)
207
 
{
208
 
        DcHandleSlotPtr         hs;
209
 
        XTIndHandlePtr          handle;
210
 
        XTIndHandleBlockPtr     hptr;
211
 
 
212
 
        for (int i=0; i<XT_HANDLE_SLOTS; i++) {
213
 
                hs = &ind_cac_globals.cg_handle_slot[i];
214
 
 
215
 
                while (hs->hs_used_handles) {
216
 
                        handle = hs->hs_used_handles;
217
 
                        xt_ind_release_handle(handle, FALSE, self);
218
 
                }
219
 
 
220
 
                while (hs->hs_free_blocks) {
221
 
                        hptr = hs->hs_free_blocks;
222
 
                        hs->hs_free_blocks = hptr->hb_next;
223
 
                        xt_free(self, hptr);
224
 
                }
225
 
 
226
 
                while (hs->hs_free_handles) {
227
 
                        handle = hs->hs_free_handles;
228
 
                        hs->hs_free_handles = handle->ih_next;
229
 
                        ind_free_handle(handle);
230
 
                }
231
 
 
232
 
                ID_HANDLE_FREE_LOCK(self, &hs->hs_handles_lock);
233
 
        }
234
 
}
235
 
 
236
 
static void ind_handle_init(XTThreadPtr self)
237
 
{
238
 
        DcHandleSlotPtr         hs;
239
 
 
240
 
        for (int i=0; i<XT_HANDLE_SLOTS; i++) {
241
 
                hs = &ind_cac_globals.cg_handle_slot[i];
242
 
                memset(hs, 0, sizeof(DcHandleSlotRec));
243
 
                ID_HANDLE_INIT_LOCK(self, &hs->hs_handles_lock);
244
 
        }
245
 
}
246
 
 
247
 
//#define CHECK_HANDLE_STRUCTS
248
 
 
249
 
#ifdef CHECK_HANDLE_STRUCTS
250
 
static int gdummy = 0;
251
 
 
252
 
static void ic_stop_here()
253
 
{
254
 
        gdummy = gdummy + 1;
255
 
        printf("Nooo %d!\n", gdummy);
256
 
}
257
 
 
258
 
static void ic_check_handle_structs()
259
 
{
260
 
        XTIndHandlePtr          handle, phandle;
261
 
        XTIndHandleBlockPtr     hptr, phptr;
262
 
        int                                     count = 0;
263
 
        int                                     ctest;
264
 
 
265
 
        phandle = NULL;
266
 
        handle = ind_cac_globals.cg_used_handles;
267
 
        while (handle) {
268
 
                if (handle == phandle)
269
 
                        ic_stop_here();
270
 
                if (handle->ih_prev != phandle)
271
 
                        ic_stop_here();
272
 
                if (handle->ih_cache_reference) {
273
 
                        ctest = handle->x.ih_cache_block->cb_handle_count;
274
 
                        if (ctest == 0 || ctest > 100)
275
 
                                ic_stop_here();
276
 
                }
277
 
                else {
278
 
                        ctest = handle->x.ih_handle_block->hb_ref_count;
279
 
                        if (ctest == 0 || ctest > 100)
280
 
                                ic_stop_here();
281
 
                }
282
 
                phandle = handle;
283
 
                handle = handle->ih_next;
284
 
                count++;
285
 
                if (count > 1000)
286
 
                        ic_stop_here();
287
 
        }
288
 
 
289
 
        count = 0;
290
 
        hptr = ind_cac_globals.cg_free_blocks;
291
 
        while (hptr) {
292
 
                if (hptr == phptr)
293
 
                        ic_stop_here();
294
 
                phptr = hptr;
295
 
                hptr = hptr->hb_next;
296
 
                count++;
297
 
                if (count > 1000)
298
 
                        ic_stop_here();
299
 
        }
300
 
 
301
 
        count = 0;
302
 
        handle = ind_cac_globals.cg_free_handles;
303
 
        while (handle) {
304
 
                if (handle == phandle)
305
 
                        ic_stop_here();
306
 
                phandle = handle;
307
 
                handle = handle->ih_next;
308
 
                count++;
309
 
                if (count > 1000)
310
 
                        ic_stop_here();
311
 
        }
312
 
}
313
 
#endif
314
 
 
315
 
/*
316
 
 * Get a handle to the index block.
317
 
 * This function is called by index scanners (readers).
318
 
 */
319
 
xtPublic XTIndHandlePtr xt_ind_get_handle(XTOpenTablePtr ot, XTIndexPtr ind, XTIndReferencePtr iref)
320
 
{
321
 
        DcHandleSlotPtr hs;
322
 
        XTIndHandlePtr  handle;
323
 
 
324
 
        hs = &ind_cac_globals.cg_handle_slot[iref->ir_block->cb_address % XT_HANDLE_SLOTS];
325
 
 
326
 
        ASSERT_NS(iref->ir_xlock == FALSE);
327
 
        ASSERT_NS(iref->ir_updated == FALSE);
328
 
        ID_HANDLE_LOCK(&hs->hs_handles_lock);
329
 
#ifdef CHECK_HANDLE_STRUCTS
330
 
        ic_check_handle_structs();
331
 
#endif
332
 
        if ((handle = hs->hs_free_handles))
333
 
                hs->hs_free_handles = handle->ih_next;
334
 
        else {
335
 
                if (!(handle = ind_alloc_handle())) {
336
 
                        ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
337
 
                        xt_ind_release(ot, ind, XT_UNLOCK_READ, iref);
338
 
                        return NULL;
339
 
                }
340
 
        }
341
 
        if (hs->hs_used_handles)
342
 
                hs->hs_used_handles->ih_prev = handle;
343
 
        handle->ih_next = hs->hs_used_handles;
344
 
        handle->ih_prev = NULL;
345
 
        handle->ih_address = iref->ir_block->cb_address;
346
 
        handle->ih_cache_reference = TRUE;
347
 
        handle->x.ih_cache_block = iref->ir_block;
348
 
        handle->ih_branch = iref->ir_branch;
349
 
        /* {HANDLE-COUNT-USAGE}
350
 
         * This is safe because:
351
 
         *
352
 
         * I have an Slock on the cache block, and I have
353
 
         * at least an Slock on the index.
354
 
         * So this excludes anyone who is reading 
355
 
         * cb_handle_count in the index.
356
 
         * (all cache block writers, and the freeer).
357
 
         *
358
 
         * The increment is safe because I have the list
359
 
         * lock (hs_handles_lock), which is required by anyone else
360
 
         * who increments or decrements this value.
361
 
         */
362
 
        iref->ir_block->cb_handle_count++;
363
 
        hs->hs_used_handles = handle;
364
 
#ifdef CHECK_HANDLE_STRUCTS
365
 
        ic_check_handle_structs();
366
 
#endif
367
 
        ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
368
 
        xt_ind_release(ot, ind, XT_UNLOCK_READ, iref);
369
 
        return handle;
370
 
}
371
 
 
372
 
xtPublic void xt_ind_release_handle(XTIndHandlePtr handle, xtBool have_lock, XTThreadPtr thread)
373
 
{
374
 
        DcHandleSlotPtr hs;
375
 
        XTIndBlockPtr   block = NULL;
376
 
        u_int                   hash_idx = 0;
377
 
        DcSegmentPtr    seg = NULL;
378
 
        XTIndBlockPtr   xblock;
379
 
 
380
 
        (void) thread; /*DRIZZLED*/
381
 
 
382
 
        /* The lock order is:
383
 
         * 1. Cache segment (cs_lock) - This is only by ind_free_block()!
384
 
         * 1. S/Slock cache block (cb_lock)
385
 
         * 2. List lock (cg_handles_lock).
386
 
         * 3. Handle lock (ih_lock)
387
 
         */
388
 
        if (!have_lock)
389
 
                xt_spinlock_lock(&handle->ih_lock);
390
 
 
391
 
        /* Get the lock on the cache page if required: */
392
 
        if (handle->ih_cache_reference) {
393
 
                u_int                   file_id;
394
 
                xtIndexNodeID   address;
395
 
 
396
 
                block = handle->x.ih_cache_block;
397
 
 
398
 
                file_id = block->cb_file_id;
399
 
                address = block->cb_address;
400
 
                hash_idx = XT_NODE_ID(address) + (file_id * 223);
401
 
                seg = &ind_cac_globals.cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
402
 
                hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % ind_cac_globals.cg_hash_size;
403
 
        }
404
 
 
405
 
        xt_spinlock_unlock(&handle->ih_lock);
406
 
 
407
 
        /* Because of the lock order, I have to release the
408
 
         * handle before I get a lock on the cache block.
409
 
         *
410
 
         * But, by doing this, this cache block may be gone!
411
 
         */
412
 
        if (block) {
413
 
                IDX_CAC_READ_LOCK(seg, thread);
414
 
                xblock = seg->cs_hash_table[hash_idx];
415
 
                while (xblock) {
416
 
                        if (block == xblock) {
417
 
                                /* Found the block... 
418
 
                                 * {HANDLE-COUNT-SLOCK}
419
 
                                 * 04.05.2009, changed to slock.
420
 
                                 * The xlock causes too much contention
421
 
                                 * on the cache block for read only loads.
422
 
                                 *
423
 
                                 * Is it safe?
424
 
                                 * See below...
425
 
                                 */
426
 
                                XT_IPAGE_READ_LOCK(&block->cb_lock);
427
 
                                goto block_found;
428
 
                        }
429
 
                        xblock = xblock->cb_next;
430
 
                }
431
 
                block = NULL;
432
 
                block_found:
433
 
                IDX_CAC_UNLOCK(seg, thread);
434
 
        }
435
 
 
436
 
        hs = &ind_cac_globals.cg_handle_slot[handle->ih_address % XT_HANDLE_SLOTS];
437
 
 
438
 
        ID_HANDLE_LOCK(&hs->hs_handles_lock);
439
 
#ifdef CHECK_HANDLE_STRUCTS
440
 
        ic_check_handle_structs();
441
 
#endif
442
 
 
443
 
        /* I don't need to lock the handle because I have locked
444
 
         * the list, and no other thread can change the
445
 
         * handle without first getting a lock on the list.
446
 
         *
447
 
         * In addition, the caller is the only owner of the
448
 
         * handle, and the only thread with an independent
449
 
         * reference to the handle.
450
 
         * All other access occur over the list.
451
 
         */
452
 
 
453
 
        /* Remove the reference to the cache or a handle block: */
454
 
        if (handle->ih_cache_reference) {
455
 
                ASSERT_NS(block == handle->x.ih_cache_block);
456
 
                ASSERT_NS(block && block->cb_handle_count > 0);
457
 
                /* {HANDLE-COUNT-USAGE}
458
 
                 * This is safe here because I have excluded
459
 
                 * all readers by taking an Xlock on the
460
 
                 * cache block (CHANGED - see below).
461
 
                 *
462
 
                 * {HANDLE-COUNT-SLOCK}
463
 
                 * 04.05.2009, changed to slock.
464
 
                 * Should be OK, because:
465
 
                 * A have a lock on the list lock (hs_handles_lock),
466
 
                 * which prevents concurrent updates to cb_handle_count.
467
 
                 *
468
 
                 * I have also have a read lock on the cache block
469
 
                 * but not a lock on the index. As a result, we cannot
470
 
                 * excluded all index writers (and readers of 
471
 
                 * cb_handle_count.
472
 
                 */
473
 
                block->cb_handle_count--;
474
 
        }
475
 
        else {
476
 
                XTIndHandleBlockPtr     hptr = handle->x.ih_handle_block;
477
 
 
478
 
                ASSERT_NS(!handle->ih_cache_reference);
479
 
                ASSERT_NS(hptr->hb_ref_count > 0);
480
 
                hptr->hb_ref_count--;
481
 
                if (!hptr->hb_ref_count) {
482
 
                        /* Put it back on the free list: */
483
 
                        hptr->hb_next = hs->hs_free_blocks;
484
 
                        hs->hs_free_blocks = hptr;
485
 
                }
486
 
        }
487
 
 
488
 
        /* Unlink the handle: */
489
 
        if (handle->ih_next)
490
 
                handle->ih_next->ih_prev = handle->ih_prev;
491
 
        if (handle->ih_prev)
492
 
                handle->ih_prev->ih_next = handle->ih_next;
493
 
        if (hs->hs_used_handles == handle)
494
 
                hs->hs_used_handles = handle->ih_next;
495
 
 
496
 
        /* Put it on the free list: */
497
 
        handle->ih_next = hs->hs_free_handles;
498
 
        hs->hs_free_handles = handle;
499
 
 
500
 
#ifdef CHECK_HANDLE_STRUCTS
501
 
        ic_check_handle_structs();
502
 
#endif
503
 
        ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
504
 
 
505
 
        if (block)
506
 
                XT_IPAGE_UNLOCK(&block->cb_lock, FALSE);
507
 
}
508
 
 
509
 
/* Call this function before a referenced cache block is modified!
510
 
 * This function is called by index updaters.
511
 
 */
512
 
xtPublic xtBool xt_ind_copy_on_write(XTIndReferencePtr iref)
513
 
{
514
 
        DcHandleSlotPtr         hs;
515
 
        XTIndHandleBlockPtr     hptr;
516
 
        u_int                           branch_size;
517
 
        XTIndHandlePtr          handle;
518
 
        u_int                           i = 0;
519
 
 
520
 
        hs = &ind_cac_globals.cg_handle_slot[iref->ir_block->cb_address % XT_HANDLE_SLOTS];
521
 
 
522
 
        ID_HANDLE_LOCK(&hs->hs_handles_lock);
523
 
 
524
 
        /* {HANDLE-COUNT-USAGE}
525
 
         * This is only called by updaters of this index block, or
526
 
         * the free which holds an Xlock on the index block.
527
 
         * These are all mutually exclusive for the index block.
528
 
         *
529
 
         * {HANDLE-COUNT-SLOCK}
530
 
         * Do this check again, after we have the list lock (hs_handles_lock).
531
 
         * There is a small chance that the count has changed, since we last
532
 
         * checked because xt_ind_release_handle() only holds
533
 
         * an slock on the index page.
534
 
         *
535
 
         * An updater can sometimes have a XLOCK on the index and an slock
536
 
         * on the cache block. In this case xt_ind_release_handle()
537
 
         * could have run through.
538
 
         */
539
 
        if (!iref->ir_block->cb_handle_count) {
540
 
                ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
541
 
                return OK;
542
 
        }
543
 
 
544
 
#ifdef CHECK_HANDLE_STRUCTS
545
 
        ic_check_handle_structs();
546
 
#endif
547
 
        if ((hptr = hs->hs_free_blocks))
548
 
                hs->hs_free_blocks = hptr->hb_next;
549
 
        else {
550
 
                if (!(hptr = (XTIndHandleBlockPtr) xt_malloc_ns(sizeof(XTIndHandleBlockRec)))) {
551
 
                        ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
552
 
                        return FAILED;
553
 
                }
554
 
        }
555
 
 
556
 
        branch_size = XT_GET_INDEX_BLOCK_LEN(XT_GET_DISK_2(iref->ir_branch->tb_size_2));
557
 
        memcpy(&hptr->hb_branch, iref->ir_branch, branch_size);
558
 
        hptr->hb_ref_count = iref->ir_block->cb_handle_count;
559
 
 
560
 
        handle = hs->hs_used_handles;
561
 
        while (handle) {
562
 
                if (handle->ih_branch == iref->ir_branch) {
563
 
                        i++;
564
 
                        xt_spinlock_lock(&handle->ih_lock);
565
 
                        ASSERT_NS(handle->ih_cache_reference);
566
 
                        handle->ih_cache_reference = FALSE;
567
 
                        handle->x.ih_handle_block = hptr;
568
 
                        handle->ih_branch = &hptr->hb_branch;
569
 
                        xt_spinlock_unlock(&handle->ih_lock);
570
 
#ifndef DEBUG
571
 
                        if (i == hptr->hb_ref_count)
572
 
                                break;
573
 
#endif
574
 
                }
575
 
                handle = handle->ih_next;
576
 
        }
577
 
#ifdef DEBUG
578
 
        ASSERT_NS(hptr->hb_ref_count == i);
579
 
#endif
580
 
        /* {HANDLE-COUNT-USAGE}
581
 
         * It is safe to modify cb_handle_count when I have the
582
 
         * list lock, and I have excluded all readers!
583
 
         */
584
 
        iref->ir_block->cb_handle_count = 0;
585
 
#ifdef CHECK_HANDLE_STRUCTS
586
 
        ic_check_handle_structs();
587
 
#endif
588
 
        ID_HANDLE_UNLOCK(&hs->hs_handles_lock);
589
 
 
590
 
        return OK;
591
 
}
592
 
 
593
 
xtPublic void xt_ind_lock_handle(XTIndHandlePtr handle)
594
 
{
595
 
        xt_spinlock_lock(&handle->ih_lock);
596
 
}
597
 
 
598
 
xtPublic void xt_ind_unlock_handle(XTIndHandlePtr handle)
599
 
{
600
 
        xt_spinlock_unlock(&handle->ih_lock);
601
 
}
602
 
 
603
 
/*
604
 
 * -----------------------------------------------------------------------
605
 
 * INIT/EXIT
606
 
 */
607
 
 
608
 
/*
609
 
 * Initialize the disk cache.
610
 
 */
611
 
xtPublic void xt_ind_init(XTThreadPtr self, size_t cache_size)
612
 
{
613
 
        XTIndBlockPtr   block;
614
 
 
615
 
#ifdef XT_USE_MYSYS
616
 
        init_key_cache(&my_cache, 1024, cache_size, 100, 300);
617
 
#endif
618
 
        /* Memory is devoted to the page data alone, I no longer count the size of the directory,
619
 
         * or the page overhead: */
620
 
        ind_cac_globals.cg_block_count = cache_size / XT_INDEX_PAGE_SIZE;
621
 
        ind_cac_globals.cg_hash_size = ind_cac_globals.cg_block_count / (IDX_CAC_SEGMENT_COUNT >> 1);
622
 
        ind_cac_globals.cg_max_free = ind_cac_globals.cg_block_count / 10;
623
 
        if (ind_cac_globals.cg_max_free < 8)
624
 
                ind_cac_globals.cg_max_free = 8;
625
 
        if (ind_cac_globals.cg_max_free > 128)
626
 
                ind_cac_globals.cg_max_free = 128;
627
 
 
628
 
        try_(a) {
629
 
                for (u_int i=0; i<IDX_CAC_SEGMENT_COUNT; i++) {
630
 
                        ind_cac_globals.cg_segment[i].cs_hash_table = (XTIndBlockPtr *) xt_calloc(self, ind_cac_globals.cg_hash_size * sizeof(XTIndBlockPtr));
631
 
                        IDX_CAC_INIT_LOCK(self, &ind_cac_globals.cg_segment[i]);
632
 
                }
633
 
 
634
 
                block = (XTIndBlockPtr) xt_malloc(self, ind_cac_globals.cg_block_count * sizeof(XTIndBlockRec));
635
 
                ind_cac_globals.cg_blocks = block;
636
 
                xt_init_mutex_with_autoname(self, &ind_cac_globals.cg_lock);
637
 
#ifdef XT_USE_DIRECT_IO_ON_INDEX
638
 
                xtWord1 *buffer;
639
 
#ifdef XT_WIN
640
 
                size_t  psize = 512;
641
 
#else
642
 
                size_t  psize = getpagesize();
643
 
#endif
644
 
                size_t  diff;
645
 
 
646
 
                buffer = (xtWord1 *) xt_malloc(self, (ind_cac_globals.cg_block_count * XT_INDEX_PAGE_SIZE));
647
 
                diff = (size_t) buffer % psize;
648
 
                if (diff != 0) {
649
 
                        xt_free(self, buffer);
650
 
                        buffer = (xtWord1 *) xt_malloc(self, (ind_cac_globals.cg_block_count * XT_INDEX_PAGE_SIZE) + psize);
651
 
                        diff = (size_t) buffer % psize;
652
 
                        if (diff != 0)
653
 
                                diff = psize - diff;
654
 
                }
655
 
                ind_cac_globals.cg_buffer = buffer;
656
 
                buffer += diff;
657
 
#endif
658
 
 
659
 
                for (u_int i=0; i<ind_cac_globals.cg_block_count; i++) {
660
 
                        XT_IPAGE_INIT_LOCK(self, &block->cb_lock);
661
 
                        block->cb_state = IDX_CAC_BLOCK_FREE;
662
 
                        block->cb_next = ind_cac_globals.cg_free_list;
663
 
#ifdef XT_USE_DIRECT_IO_ON_INDEX
664
 
                        block->cb_data = buffer;
665
 
                        buffer += XT_INDEX_PAGE_SIZE;
666
 
#endif
667
 
#ifdef CHECK_BLOCK_TRAILERS
668
 
                        XT_SET_DISK_4(block->cp_check, 0xDEADBEEF);
669
 
#endif
670
 
                        ind_cac_globals.cg_free_list = block;
671
 
                        block++;
672
 
                }
673
 
                ind_cac_globals.cg_free_count = ind_cac_globals.cg_block_count;
674
 
#ifdef DEBUG_CHECK_IND_CACHE
675
 
                ind_cac_globals.cg_reserved_by_ots = 0;
676
 
#endif
677
 
                ind_handle_init(self);
678
 
        }
679
 
        catch_(a) {
680
 
                xt_ind_exit(self);
681
 
                throw_();
682
 
        }
683
 
        cont_(a);
684
 
}
685
 
 
686
 
#ifdef CHECK_BLOCK_TRAILERS
687
 
xtPublic void check_block_trailers()
688
 
{
689
 
        XTIndBlockPtr   block;
690
 
 
691
 
        block = ind_cac_globals.cg_blocks;
692
 
        for (u_int i=0; i<ind_cac_globals.cg_block_count; i++) {
693
 
                ASSERT_NS(XT_GET_DISK_4(block->cp_check) == 0xDEADBEEF);
694
 
                block++;
695
 
        }
696
 
}
697
 
#endif
698
 
 
699
 
xtPublic void xt_ind_exit(XTThreadPtr self)
700
 
{
701
 
#ifdef XT_USE_MYSYS
702
 
        end_key_cache(&my_cache, 1);
703
 
#endif
704
 
        for (u_int i=0; i<IDX_CAC_SEGMENT_COUNT; i++) {
705
 
                if (ind_cac_globals.cg_segment[i].cs_hash_table) {
706
 
                        xt_free(self, ind_cac_globals.cg_segment[i].cs_hash_table);
707
 
                        ind_cac_globals.cg_segment[i].cs_hash_table = NULL;
708
 
                        IDX_CAC_FREE_LOCK(self, &ind_cac_globals.cg_segment[i]);
709
 
                }
710
 
        }
711
 
 
712
 
        /* Must be done before freeing the blocks! */
713
 
        ind_handle_exit(self);
714
 
 
715
 
        if (ind_cac_globals.cg_blocks) {
716
 
                xt_free(self, ind_cac_globals.cg_blocks);
717
 
                ind_cac_globals.cg_blocks = NULL;
718
 
                xt_free_mutex(&ind_cac_globals.cg_lock);
719
 
        }
720
 
#ifdef XT_USE_DIRECT_IO_ON_INDEX
721
 
        if (ind_cac_globals.cg_buffer) {
722
 
                xt_free(self, ind_cac_globals.cg_buffer);
723
 
                ind_cac_globals.cg_buffer = NULL;
724
 
        }
725
 
#endif
726
 
 
727
 
        memset(&ind_cac_globals, 0, sizeof(ind_cac_globals));
728
 
}
729
 
 
730
 
xtPublic xtInt8 xt_ind_get_usage()
731
 
{
732
 
        xtInt8 size = 0;
733
 
 
734
 
        size = (xtInt8) (ind_cac_globals.cg_block_count - ind_cac_globals.cg_free_count) * (xtInt8) XT_INDEX_PAGE_SIZE;
735
 
        return size;
736
 
}
737
 
 
738
 
xtPublic xtInt8 xt_ind_get_size()
739
 
{
740
 
        xtInt8 size = 0;
741
 
 
742
 
        size = (xtInt8) ind_cac_globals.cg_block_count * (xtInt8) XT_INDEX_PAGE_SIZE;
743
 
        return size;
744
 
}
745
 
 
746
 
xtPublic u_int xt_ind_get_blocks()
747
 
{
748
 
        return ind_cac_globals.cg_block_count;
749
 
}
750
 
 
751
 
/*
752
 
 * -----------------------------------------------------------------------
753
 
 * INDEX CHECKING
754
 
 */
755
 
 
756
 
xtPublic void xt_ind_check_cache(XTIndexPtr ind)
757
 
{
758
 
        XTIndBlockPtr   block;
759
 
        u_int                   free_count, inuse_count, clean_count;
760
 
        xtBool                  check_count = FALSE;
761
 
 
762
 
        if (ind == (XTIndex *) 1) {
763
 
                ind = NULL;
764
 
                check_count = TRUE;
765
 
        }
766
 
 
767
 
        // Check the dirty list:
768
 
        if (ind) {
769
 
                u_int cnt = 0;
770
 
 
771
 
                block = ind->mi_dirty_list;
772
 
                while (block) {
773
 
                        cnt++;
774
 
                        ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_DIRTY);
775
 
                        block = block->cb_dirty_next;
776
 
                }
777
 
                ASSERT_NS(ind->mi_dirty_blocks == cnt);
778
 
        }
779
 
 
780
 
        xt_lock_mutex_ns(&ind_cac_globals.cg_lock);
781
 
 
782
 
        // Check the free list:
783
 
        free_count = 0;
784
 
        block = ind_cac_globals.cg_free_list;
785
 
        while (block) {
786
 
                free_count++;
787
 
                ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_FREE);
788
 
                block = block->cb_next;
789
 
        }
790
 
        ASSERT_NS(ind_cac_globals.cg_free_count == free_count);
791
 
 
792
 
        /* Check the LRU list: */
793
 
        XTIndBlockPtr list_block, plist_block;
794
 
        
795
 
        plist_block = NULL;
796
 
        list_block = ind_cac_globals.cg_lru_block;
797
 
        if (list_block) {
798
 
                ASSERT_NS(ind_cac_globals.cg_mru_block != NULL);
799
 
                ASSERT_NS(ind_cac_globals.cg_mru_block->cb_mr_used == NULL);
800
 
                ASSERT_NS(list_block->cb_lr_used == NULL);
801
 
                inuse_count = 0;
802
 
                clean_count = 0;
803
 
                while (list_block) {
804
 
                        inuse_count++;
805
 
                        ASSERT_NS(IDX_CAC_NOT_FREE(list_block->cb_state));
806
 
                        if (list_block->cb_state == IDX_CAC_BLOCK_CLEAN)
807
 
                                clean_count++;
808
 
                        ASSERT_NS(block != list_block);
809
 
                        ASSERT_NS(list_block->cb_lr_used == plist_block);
810
 
                        plist_block = list_block;
811
 
                        list_block = list_block->cb_mr_used;
812
 
                }
813
 
                ASSERT_NS(ind_cac_globals.cg_mru_block == plist_block);
814
 
        }
815
 
        else {
816
 
                inuse_count = 0;
817
 
                clean_count = 0;
818
 
                ASSERT_NS(ind_cac_globals.cg_mru_block == NULL);
819
 
        }
820
 
 
821
 
#ifdef DEBUG_CHECK_IND_CACHE
822
 
        ASSERT_NS(free_count + inuse_count + ind_cac_globals.cg_reserved_by_ots + ind_cac_globals.cg_read_count == ind_cac_globals.cg_block_count);
823
 
#endif
824
 
        xt_unlock_mutex_ns(&ind_cac_globals.cg_lock);
825
 
        if (check_count) {
826
 
                /* We have just flushed, check how much is now free/clean. */
827
 
                if (free_count + clean_count < 10) {
828
 
                        /* This could be a problem: */
829
 
                        printf("Cache very low!\n");
830
 
                }
831
 
        }
832
 
}
833
 
 
834
 
/*
835
 
 * -----------------------------------------------------------------------
836
 
 * FREEING INDEX CACHE
837
 
 */
838
 
 
839
 
/*
840
 
 * This function return TRUE if the block is freed. 
841
 
 * This function returns FALSE if the block cannot be found, or the
842
 
 * block is not clean.
843
 
 *
844
 
 * We also return FALSE if we cannot copy the block to the handle
845
 
 * (if this is required). This will be due to out-of-memory!
846
 
 */
847
 
static xtBool ind_free_block(XTOpenTablePtr ot, XTIndBlockPtr block)
848
 
{
849
 
        XTIndBlockPtr   xblock, pxblock;
850
 
        u_int                   hash_idx;
851
 
        u_int                   file_id;
852
 
        xtIndexNodeID   address;
853
 
        DcSegmentPtr    seg;
854
 
 
855
 
        (void) ot; /*DRIZZLED*/
856
 
 
857
 
#ifdef DEBUG_CHECK_IND_CACHE
858
 
        xt_ind_check_cache(NULL);
859
 
#endif
860
 
        file_id = block->cb_file_id;
861
 
        address = block->cb_address;
862
 
 
863
 
        hash_idx = XT_NODE_ID(address) + (file_id * 223);
864
 
        seg = &ind_cac_globals.cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
865
 
        hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % ind_cac_globals.cg_hash_size;
866
 
 
867
 
        IDX_CAC_WRITE_LOCK(seg, ot->ot_thread);
868
 
 
869
 
        pxblock = NULL;
870
 
        xblock = seg->cs_hash_table[hash_idx];
871
 
        while (xblock) {
872
 
                if (block == xblock) {
873
 
                        /* Found the block... */
874
 
                        /* It is possible that a thread enters this code holding a
875
 
                         * lock on a page. This can cause a deadlock:
876
 
                         *
877
 
                         * #0   0x91faa2ce in semaphore_wait_signal_trap
878
 
                         * #1   0x91fb1da5 in pthread_mutex_lock
879
 
                         * #2   0x00e2ec13 in xt_p_mutex_lock at pthread_xt.cc:544
880
 
                         * #3   0x00e6c30a in xt_xsmutex_xlock at lock_xt.cc:1547
881
 
                         * #4   0x00dee402 in ind_free_block at cache_xt.cc:879
882
 
                         * #5   0x00dee76a in ind_cac_free_lru_blocks at cache_xt.cc:1033
883
 
                         * #6   0x00def8d1 in xt_ind_reserve at cache_xt.cc:1513
884
 
                         * #7   0x00e22118 in xt_idx_insert at index_xt.cc:2047
885
 
                         * #8   0x00e4d7ee in xt_tab_new_record at table_xt.cc:4702
886
 
                         * #9   0x00e0ff0b in ha_pbxt::write_row at ha_pbxt.cc:2340
887
 
                         * #10  0x0023a00f in handler::ha_write_row at handler.cc:4570
888
 
                         * #11  0x001a32c8 in write_record at sql_insert.cc:1568
889
 
                         * #12  0x001ab635 in mysql_insert at sql_insert.cc:812
890
 
                         * #13  0x0010e068 in mysql_execute_command at sql_parse.cc:3066
891
 
                         * #14  0x0011480d in mysql_parse at sql_parse.cc:5787
892
 
                         * #15  0x00115afb in dispatch_command at sql_parse.cc:1200
893
 
                         * #16  0x00116de2 in do_command at sql_parse.cc:857
894
 
                         * #17  0x00101ee4 in handle_one_connection at sql_connect.cc:1115
895
 
                         * #18  0x91fdb155 in _pthread_start
896
 
                         * #19  0x91fdb012 in thread_start
897
 
                         * 
898
 
                         * #0   0x91fb146e in __semwait_signal
899
 
                         * #1   0x91fb12ef in nanosleep$UNIX2003
900
 
                         * #2   0x91fb1236 in usleep$UNIX2003
901
 
                         * #3   0x00e52112 in xt_yield at thread_xt.cc:1274
902
 
                         * #4   0x00e6c0eb in xt_spinxslock_xlock at lock_xt.cc:1456
903
 
                         * #5   0x00dee444 in ind_free_block at cache_xt.cc:886
904
 
                         * #6   0x00dee76a in ind_cac_free_lru_blocks at cache_xt.cc:1033
905
 
                         * #7   0x00deeaf0 in ind_cac_fetch at cache_xt.cc:1130
906
 
                         * #8   0x00def604 in xt_ind_fetch at cache_xt.cc:1386
907
 
                         * #9   0x00e2159a in xt_idx_update_row_id at index_xt.cc:2489
908
 
                         * #10  0x00e603c8 in xn_sw_clean_indices at xaction_xt.cc:1932
909
 
                         * #11  0x00e606d4 in xn_sw_cleanup_variation at xaction_xt.cc:2056
910
 
                         * #12  0x00e60e29 in xn_sw_cleanup_xact at xaction_xt.cc:2276
911
 
                         * #13  0x00e615ed in xn_sw_main at xaction_xt.cc:2433
912
 
                         * #14  0x00e61919 in xn_sw_run_thread at xaction_xt.cc:2564
913
 
                         * #15  0x00e53f80 in thr_main at thread_xt.cc:1017
914
 
                         * #16  0x91fdb155 in _pthread_start
915
 
                         * #17  0x91fdb012 in thread_start
916
 
                         *
917
 
                         * So we back off if a lock is held!
918
 
                         */
919
 
                        if (!XT_IPAGE_WRITE_TRY_LOCK(&block->cb_lock, ot->ot_thread->t_id)) {
920
 
                                IDX_CAC_UNLOCK(seg, ot->ot_thread);
921
 
#ifdef DEBUG_CHECK_IND_CACHE
922
 
                                xt_ind_check_cache(NULL);
923
 
#endif
924
 
                                return FALSE;
925
 
                        }
926
 
                        if (block->cb_state != IDX_CAC_BLOCK_CLEAN) {
927
 
                                /* This block cannot be freeed: */
928
 
                                XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
929
 
                                IDX_CAC_UNLOCK(seg, ot->ot_thread);
930
 
#ifdef DEBUG_CHECK_IND_CACHE
931
 
                                xt_ind_check_cache(NULL);
932
 
#endif
933
 
                                return FALSE;
934
 
                        }
935
 
                        
936
 
                        goto free_the_block;
937
 
                }
938
 
                pxblock = xblock;
939
 
                xblock = xblock->cb_next;
940
 
        }
941
 
 
942
 
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
943
 
 
944
 
        /* Not found (this can happen, if block was freed by another thread) */
945
 
#ifdef DEBUG_CHECK_IND_CACHE
946
 
        xt_ind_check_cache(NULL);
947
 
#endif
948
 
        return FALSE;
949
 
 
950
 
        free_the_block:
951
 
 
952
 
        /* If the block is reference by a handle, then we
953
 
         * have to copy the data to the handle before we
954
 
         * free the page:
955
 
         */
956
 
        /* {HANDLE-COUNT-USAGE}
957
 
         * This access is safe because:
958
 
         *
959
 
         * We have an Xlock on the cache block, which excludes
960
 
         * all other writers that want to change the cache block
961
 
         * and also all readers of the cache block, because
962
 
         * they all have at least an Slock on the cache block.
963
 
         */
964
 
        if (block->cb_handle_count) {
965
 
                XTIndReferenceRec       iref;
966
 
                
967
 
                iref.ir_xlock = TRUE;
968
 
                iref.ir_updated = FALSE;
969
 
                iref.ir_block = block;
970
 
                iref.ir_branch = (XTIdxBranchDPtr) block->cb_data;
971
 
                if (!xt_ind_copy_on_write(&iref)) {
972
 
                        XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
973
 
                        return FALSE;
974
 
                }
975
 
        }
976
 
 
977
 
        /* Block is clean, remove from the hash table: */
978
 
        if (pxblock)
979
 
                pxblock->cb_next = block->cb_next;
980
 
        else
981
 
                seg->cs_hash_table[hash_idx] = block->cb_next;
982
 
 
983
 
        xt_lock_mutex_ns(&ind_cac_globals.cg_lock);
984
 
 
985
 
        /* Remove from the MRU list: */
986
 
        if (ind_cac_globals.cg_lru_block == block)
987
 
                ind_cac_globals.cg_lru_block = block->cb_mr_used;
988
 
        if (ind_cac_globals.cg_mru_block == block)
989
 
                ind_cac_globals.cg_mru_block = block->cb_lr_used;
990
 
        
991
 
        /* Note, I am updating blocks for which I have no lock
992
 
         * here. But I think this is OK because I have a lock
993
 
         * for the MRU list.
994
 
         */
995
 
        if (block->cb_lr_used)
996
 
                block->cb_lr_used->cb_mr_used = block->cb_mr_used;
997
 
        if (block->cb_mr_used)
998
 
                block->cb_mr_used->cb_lr_used = block->cb_lr_used;
999
 
 
1000
 
        /* The block is now free: */
1001
 
        block->cb_next = ind_cac_globals.cg_free_list;
1002
 
        ind_cac_globals.cg_free_list = block;
1003
 
        ind_cac_globals.cg_free_count++;
1004
 
        ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN);
1005
 
        block->cb_state = IDX_CAC_BLOCK_FREE;
1006
 
        IDX_TRACE("%d- f%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(block->cb_data));
1007
 
 
1008
 
        /* Unlock BEFORE the block is reused! */
1009
 
        XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
1010
 
 
1011
 
        xt_unlock_mutex_ns(&ind_cac_globals.cg_lock);
1012
 
 
1013
 
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
1014
 
 
1015
 
#ifdef DEBUG_CHECK_IND_CACHE
1016
 
        xt_ind_check_cache(NULL);
1017
 
#endif
1018
 
        return TRUE;
1019
 
}
1020
 
 
1021
 
#define IND_CACHE_MAX_BLOCKS_TO_FREE            100
1022
 
 
1023
 
/*
1024
 
 * Return the number of blocks freed.
1025
 
 *
1026
 
 * The idea is to grab a list of blocks to free.
1027
 
 * The list consists of the LRU blocks that are
1028
 
 * clean.
1029
 
 *
1030
 
 * Free as many as possible (up to max of blocks_required)
1031
 
 * from the list, even if LRU position has changed
1032
 
 * (or we have a race if there are too few blocks).
1033
 
 * However, if the block cannot be found, or is dirty
1034
 
 * we must skip it.
1035
 
 *
1036
 
 * Repeat until we find no blocks for the list, or
1037
 
 * we have freed 'blocks_required'.
1038
 
 *
1039
 
 * 'not_this' is a block that must not be freed because
1040
 
 * it is locked by the calling thread!
1041
 
 */
1042
 
static u_int ind_cac_free_lru_blocks(XTOpenTablePtr ot, u_int blocks_required, XTIdxBranchDPtr not_this)
1043
 
{
1044
 
        register DcGlobalsRec   *dcg = &ind_cac_globals;
1045
 
        XTIndBlockPtr                   to_free[IND_CACHE_MAX_BLOCKS_TO_FREE];
1046
 
        int                                             count;
1047
 
        XTIndBlockPtr                   block;
1048
 
        u_int                                   blocks_freed = 0;
1049
 
        XTIndBlockPtr                   locked_block;
1050
 
 
1051
 
#ifdef XT_USE_DIRECT_IO_ON_INDEX
1052
 
#error This will not work!
1053
 
#endif
1054
 
        locked_block = (XTIndBlockPtr) ((xtWord1 *) not_this - offsetof(XTIndBlockRec, cb_data));
1055
 
 
1056
 
        retry:
1057
 
        xt_lock_mutex_ns(&ind_cac_globals.cg_lock);
1058
 
        block = dcg->cg_lru_block;
1059
 
        count = 0;
1060
 
        while (block && count < IND_CACHE_MAX_BLOCKS_TO_FREE) {
1061
 
                if (block != locked_block && block->cb_state == IDX_CAC_BLOCK_CLEAN) {
1062
 
                        to_free[count] = block;
1063
 
                        count++;
1064
 
                }
1065
 
                block = block->cb_mr_used;
1066
 
        }
1067
 
        xt_unlock_mutex_ns(&ind_cac_globals.cg_lock);
1068
 
 
1069
 
        if (!count)
1070
 
                return blocks_freed;
1071
 
 
1072
 
        for (int i=0; i<count; i++) {
1073
 
                if (ind_free_block(ot, to_free[i]))
1074
 
                        blocks_freed++;
1075
 
                if (blocks_freed >= blocks_required &&
1076
 
                        ind_cac_globals.cg_free_count >= ind_cac_globals.cg_max_free + blocks_required)
1077
 
                return blocks_freed;
1078
 
        }
1079
 
 
1080
 
        goto retry;
1081
 
}
1082
 
 
1083
 
/*
1084
 
 * -----------------------------------------------------------------------
1085
 
 * MAIN CACHE FUNCTIONS
1086
 
 */
1087
 
 
1088
 
/*
1089
 
 * Fetch the block. Note, if we are about to write the block
1090
 
 * then there is no need to read it from disk!
1091
 
 */
1092
 
static XTIndBlockPtr ind_cac_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, DcSegmentPtr *ret_seg, xtBool read_data)
1093
 
{
1094
 
        register XTOpenFilePtr  file = ot->ot_ind_file;
1095
 
        register XTIndBlockPtr  block, new_block;
1096
 
        register DcSegmentPtr   seg;
1097
 
        register u_int                  hash_idx;
1098
 
        register DcGlobalsRec   *dcg = &ind_cac_globals;
1099
 
        size_t                                  red_size;
1100
 
 
1101
 
#ifdef DEBUG_CHECK_IND_CACHE
1102
 
        xt_ind_check_cache(NULL);
1103
 
#endif
1104
 
        /* Address, plus file ID multiplied by my favorite prime number! */
1105
 
        hash_idx = XT_NODE_ID(address) + (file->fr_id * 223);
1106
 
        seg = &dcg->cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
1107
 
        hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % dcg->cg_hash_size;
1108
 
 
1109
 
        IDX_CAC_READ_LOCK(seg, ot->ot_thread);
1110
 
        block = seg->cs_hash_table[hash_idx];
1111
 
        while (block) {
1112
 
                if (XT_NODE_ID(block->cb_address) == XT_NODE_ID(address) && block->cb_file_id == file->fr_id) {
1113
 
                        ASSERT_NS(block->cb_state != IDX_CAC_BLOCK_FREE);
1114
 
 
1115
 
                        /* Check how recently this page has been used: */
1116
 
                        if (XT_TIME_DIFF(block->cb_ru_time, dcg->cg_ru_now) > (dcg->cg_block_count >> 1)) {
1117
 
                                xt_lock_mutex_ns(&dcg->cg_lock);
1118
 
 
1119
 
                                /* Move to the front of the MRU list: */
1120
 
                                block->cb_ru_time = ++dcg->cg_ru_now;
1121
 
                                if (dcg->cg_mru_block != block) {
1122
 
                                        /* Remove from the MRU list: */
1123
 
                                        if (dcg->cg_lru_block == block)
1124
 
                                                dcg->cg_lru_block = block->cb_mr_used;
1125
 
                                        if (block->cb_lr_used)
1126
 
                                                block->cb_lr_used->cb_mr_used = block->cb_mr_used;
1127
 
                                        if (block->cb_mr_used)
1128
 
                                                block->cb_mr_used->cb_lr_used = block->cb_lr_used;
1129
 
 
1130
 
                                        /* Make the block the most recently used: */
1131
 
                                        if ((block->cb_lr_used = dcg->cg_mru_block))
1132
 
                                                dcg->cg_mru_block->cb_mr_used = block;
1133
 
                                        block->cb_mr_used = NULL;
1134
 
                                        dcg->cg_mru_block = block;
1135
 
                                        if (!dcg->cg_lru_block)
1136
 
                                                dcg->cg_lru_block = block;
1137
 
                                }
1138
 
 
1139
 
                                xt_unlock_mutex_ns(&dcg->cg_lock);
1140
 
                        }
1141
 
                
1142
 
                        *ret_seg = seg;
1143
 
#ifdef DEBUG_CHECK_IND_CACHE
1144
 
                        xt_ind_check_cache(NULL);
1145
 
#endif
1146
 
                        ot->ot_thread->st_statistics.st_ind_cache_hit++;
1147
 
                        return block;
1148
 
                }
1149
 
                block = block->cb_next;
1150
 
        }
1151
 
        
1152
 
        /* Block not found... */
1153
 
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
1154
 
 
1155
 
        /* Check the open table reserve list first: */
1156
 
        if ((new_block = ot->ot_ind_res_bufs)) {
1157
 
                ot->ot_ind_res_bufs = new_block->cb_next;
1158
 
                ot->ot_ind_res_count--;
1159
 
#ifdef DEBUG_CHECK_IND_CACHE
1160
 
                xt_lock_mutex_ns(&dcg->cg_lock);
1161
 
                dcg->cg_reserved_by_ots--;
1162
 
                dcg->cg_read_count++;
1163
 
                xt_unlock_mutex_ns(&dcg->cg_lock);
1164
 
#endif
1165
 
                goto use_free_block;
1166
 
        }
1167
 
 
1168
 
        free_some_blocks:
1169
 
        if (!dcg->cg_free_list) {
1170
 
                if (!ind_cac_free_lru_blocks(ot, 1, NULL)) {
1171
 
                        if (!dcg->cg_free_list) {
1172
 
                                xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_INDEX_CACHE);
1173
 
#ifdef DEBUG_CHECK_IND_CACHE
1174
 
                                xt_ind_check_cache(NULL);
1175
 
#endif
1176
 
                                return NULL;
1177
 
                        }
1178
 
                }
1179
 
        }
1180
 
 
1181
 
        /* Get a free block: */
1182
 
        xt_lock_mutex_ns(&dcg->cg_lock);
1183
 
        if (!(new_block = dcg->cg_free_list)) {
1184
 
                xt_unlock_mutex_ns(&dcg->cg_lock);
1185
 
                goto free_some_blocks;
1186
 
        }
1187
 
        ASSERT_NS(new_block->cb_state == IDX_CAC_BLOCK_FREE);
1188
 
        dcg->cg_free_list = new_block->cb_next;
1189
 
        dcg->cg_free_count--;
1190
 
#ifdef DEBUG_CHECK_IND_CACHE
1191
 
        dcg->cg_read_count++;
1192
 
#endif
1193
 
        xt_unlock_mutex_ns(&dcg->cg_lock);
1194
 
 
1195
 
        use_free_block:
1196
 
        new_block->cb_address = address;
1197
 
        new_block->cb_file_id = file->fr_id;
1198
 
        ASSERT_NS(new_block->cb_state == IDX_CAC_BLOCK_FREE);
1199
 
        new_block->cb_state = IDX_CAC_BLOCK_CLEAN;
1200
 
        new_block->cb_handle_count = 0;
1201
 
        new_block->cp_del_count = 0;
1202
 
        new_block->cb_dirty_next = NULL;
1203
 
        new_block->cb_dirty_prev = NULL;
1204
 
#ifdef IND_OPT_DATA_WRITTEN
1205
 
        new_block->cb_header = FALSE;
1206
 
        new_block->cb_min_pos = 0xFFFF;
1207
 
        new_block->cb_max_pos = 0;
1208
 
#endif
1209
 
 
1210
 
        if (read_data) {
1211
 
                if (!xt_pread_file(file, xt_ind_node_to_offset(ot->ot_table, address), XT_INDEX_PAGE_SIZE, 0, new_block->cb_data, &red_size, &ot->ot_thread->st_statistics.st_ind, ot->ot_thread)) {
1212
 
                        xt_lock_mutex_ns(&dcg->cg_lock);
1213
 
                        new_block->cb_next = dcg->cg_free_list;
1214
 
                        dcg->cg_free_list = new_block;
1215
 
                        dcg->cg_free_count++;
1216
 
#ifdef DEBUG_CHECK_IND_CACHE
1217
 
                        dcg->cg_read_count--;
1218
 
#endif
1219
 
                        ASSERT_NS(new_block->cb_state == IDX_CAC_BLOCK_CLEAN);
1220
 
                        new_block->cb_state = IDX_CAC_BLOCK_FREE;
1221
 
                        IDX_TRACE("%d- F%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(new_block->cb_data));
1222
 
                        xt_unlock_mutex_ns(&dcg->cg_lock);
1223
 
#ifdef DEBUG_CHECK_IND_CACHE
1224
 
                        xt_ind_check_cache(NULL);
1225
 
#endif
1226
 
                        return NULL;
1227
 
                }
1228
 
                IDX_TRACE("%d- R%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(new_block->cb_data));
1229
 
                ot->ot_thread->st_statistics.st_ind_cache_miss++;
1230
 
        }
1231
 
        else
1232
 
                red_size = 0;
1233
 
        // PMC - I don't think this is required! memset(new_block->cb_data + red_size, 0, XT_INDEX_PAGE_SIZE - red_size);
1234
 
 
1235
 
        IDX_CAC_WRITE_LOCK(seg, ot->ot_thread);
1236
 
        block = seg->cs_hash_table[hash_idx];
1237
 
        while (block) {
1238
 
                if (XT_NODE_ID(block->cb_address) == XT_NODE_ID(address) && block->cb_file_id == file->fr_id) {
1239
 
                        /* Oops, someone else was faster! */
1240
 
                        xt_lock_mutex_ns(&dcg->cg_lock);
1241
 
                        new_block->cb_next = dcg->cg_free_list;
1242
 
                        dcg->cg_free_list = new_block;
1243
 
                        dcg->cg_free_count++;
1244
 
#ifdef DEBUG_CHECK_IND_CACHE
1245
 
                        dcg->cg_read_count--;
1246
 
#endif
1247
 
                        ASSERT_NS(new_block->cb_state == IDX_CAC_BLOCK_CLEAN);
1248
 
                        new_block->cb_state = IDX_CAC_BLOCK_FREE;
1249
 
                        IDX_TRACE("%d- F%x\n", (int) XT_NODE_ID(address), (int) XT_GET_DISK_2(new_block->cb_data));
1250
 
                        xt_unlock_mutex_ns(&dcg->cg_lock);
1251
 
                        goto done_ok;
1252
 
                }
1253
 
                block = block->cb_next;
1254
 
        }
1255
 
        block = new_block;
1256
 
 
1257
 
        /* Make the block the most recently used: */
1258
 
        xt_lock_mutex_ns(&dcg->cg_lock);
1259
 
        block->cb_ru_time = ++dcg->cg_ru_now;
1260
 
        if ((block->cb_lr_used = dcg->cg_mru_block))
1261
 
                dcg->cg_mru_block->cb_mr_used = block;
1262
 
        block->cb_mr_used = NULL;
1263
 
        dcg->cg_mru_block = block;
1264
 
        if (!dcg->cg_lru_block)
1265
 
                dcg->cg_lru_block = block;
1266
 
#ifdef DEBUG_CHECK_IND_CACHE
1267
 
        dcg->cg_read_count--;
1268
 
#endif
1269
 
        xt_unlock_mutex_ns(&dcg->cg_lock);
1270
 
 
1271
 
        /* {LAZY-DEL-INDEX-ITEMS}
1272
 
         * Conditionally count the number of deleted entries in the index:
1273
 
         * We do this before other threads can read the block.
1274
 
         */
1275
 
        if (ind && ind->mi_lazy_delete && read_data)
1276
 
                xt_ind_count_deleted_items(ot->ot_table, ind, block);
1277
 
 
1278
 
        /* Add to the hash table: */
1279
 
        block->cb_next = seg->cs_hash_table[hash_idx];
1280
 
        seg->cs_hash_table[hash_idx] = block;
1281
 
 
1282
 
        done_ok:
1283
 
        *ret_seg = seg;
1284
 
#ifdef DEBUG_CHECK_IND_CACHE
1285
 
        xt_ind_check_cache(NULL);
1286
 
#endif
1287
 
        return block;
1288
 
}
1289
 
 
1290
 
static xtBool ind_cac_get(XTOpenTablePtr ot, xtIndexNodeID address, DcSegmentPtr *ret_seg, XTIndBlockPtr *ret_block)
1291
 
{
1292
 
        register XTOpenFilePtr  file = ot->ot_ind_file;
1293
 
        register XTIndBlockPtr  block;
1294
 
        register DcSegmentPtr   seg;
1295
 
        register u_int                  hash_idx;
1296
 
        register DcGlobalsRec   *dcg = &ind_cac_globals;
1297
 
 
1298
 
        hash_idx = XT_NODE_ID(address) + (file->fr_id * 223);
1299
 
        seg = &dcg->cg_segment[hash_idx & IDX_CAC_SEGMENT_MASK];
1300
 
        hash_idx = (hash_idx >> XT_INDEX_CACHE_SEGMENT_SHIFTS) % dcg->cg_hash_size;
1301
 
 
1302
 
        IDX_CAC_READ_LOCK(seg, ot->ot_thread);
1303
 
        block = seg->cs_hash_table[hash_idx];
1304
 
        while (block) {
1305
 
                if (XT_NODE_ID(block->cb_address) == XT_NODE_ID(address) && block->cb_file_id == file->fr_id) {
1306
 
                        ASSERT_NS(block->cb_state != IDX_CAC_BLOCK_FREE);
1307
 
 
1308
 
                        *ret_seg = seg;
1309
 
                        *ret_block = block;
1310
 
                        return OK;
1311
 
                }
1312
 
                block = block->cb_next;
1313
 
        }
1314
 
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
1315
 
        
1316
 
        /* Block not found: */
1317
 
        *ret_seg = NULL;
1318
 
        *ret_block = NULL;
1319
 
        return OK;
1320
 
}
1321
 
 
1322
 
xtPublic xtBool xt_ind_write(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, size_t size, xtWord1 *data)
1323
 
{
1324
 
        XTIndBlockPtr   block;
1325
 
        DcSegmentPtr    seg;
1326
 
 
1327
 
        if (!(block = ind_cac_fetch(ot, ind, address, &seg, FALSE)))
1328
 
                return FAILED;
1329
 
 
1330
 
        XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
1331
 
        if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1332
 
                if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1333
 
                        XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
1334
 
                        IDX_CAC_UNLOCK(seg, ot->ot_thread);
1335
 
                        return FAILED;
1336
 
                }
1337
 
        }
1338
 
#ifdef IND_OPT_DATA_WRITTEN
1339
 
        block->cb_header = TRUE;
1340
 
        block->cb_min_pos = 0;
1341
 
        if (size-XT_INDEX_PAGE_HEAD_SIZE > block->cb_max_pos)
1342
 
                block->cb_max_pos = size-XT_INDEX_PAGE_HEAD_SIZE;
1343
 
        ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-XT_INDEX_PAGE_HEAD_SIZE);
1344
 
        ASSERT_NS(block->cb_min_pos < block->cb_max_pos);
1345
 
#endif
1346
 
        ASSERT_NS(IDX_CAC_MODIFYABLE(block->cb_state));
1347
 
        memcpy(block->cb_data, data, size);
1348
 
        if (block->cb_state != IDX_CAC_BLOCK_DIRTY) {
1349
 
                TRACK_BLOCK_WRITE(offset);
1350
 
                xt_spinlock_lock(&ind->mi_dirty_lock);
1351
 
                if ((block->cb_dirty_next = ind->mi_dirty_list))
1352
 
                        ind->mi_dirty_list->cb_dirty_prev = block;
1353
 
                block->cb_dirty_prev = NULL;
1354
 
                ind->mi_dirty_list = block;
1355
 
                ind->mi_dirty_blocks++;
1356
 
                xt_spinlock_unlock(&ind->mi_dirty_lock);
1357
 
                if (block->cb_state != IDX_CAC_BLOCK_LOGGED) {
1358
 
                        ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN);
1359
 
                        ot->ot_thread->st_statistics.st_ind_cache_dirty++;
1360
 
                }
1361
 
                block->cb_state = IDX_CAC_BLOCK_DIRTY;
1362
 
        }
1363
 
        XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
1364
 
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
1365
 
#ifdef XT_TRACK_INDEX_UPDATES
1366
 
        ot->ot_ind_changed++;
1367
 
#endif
1368
 
#ifdef CHECK_BLOCK_TRAILERS
1369
 
        check_block_trailers();
1370
 
#endif
1371
 
        return OK;
1372
 
}
1373
 
 
1374
 
/*
1375
 
 * Update the cache, if in RAM.
1376
 
 */
1377
 
xtPublic xtBool xt_ind_write_cache(XTOpenTablePtr ot, xtIndexNodeID address, size_t size, xtWord1 *data)
1378
 
{
1379
 
        XTIndBlockPtr   block;
1380
 
        DcSegmentPtr    seg;
1381
 
 
1382
 
        if (!ind_cac_get(ot, address, &seg, &block))
1383
 
                return FAILED;
1384
 
 
1385
 
        if (block) {
1386
 
                XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
1387
 
                /* This should only be done to pages that are free, which
1388
 
                 * are not on the dirty list, so they must be clean!
1389
 
                 */
1390
 
                ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN);
1391
 
                memcpy(block->cb_data, data, size);
1392
 
 
1393
 
                XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
1394
 
                IDX_CAC_UNLOCK(seg, ot->ot_thread);
1395
 
        }
1396
 
 
1397
 
        return OK;
1398
 
}
1399
 
 
1400
 
xtPublic xtBool xt_ind_get(XTOpenTablePtr ot, xtIndexNodeID address, XTIndReferencePtr iref)
1401
 
{
1402
 
        XTIndBlockPtr   block;
1403
 
        DcSegmentPtr    seg;
1404
 
 
1405
 
        if (!ind_cac_get(ot, address, &seg, &block))
1406
 
                return FAILED;
1407
 
 
1408
 
        if (block) {
1409
 
                XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
1410
 
                ASSERT_NS(IDX_CAC_NOT_FREE(block->cb_state));
1411
 
                IDX_CAC_UNLOCK(seg, ot->ot_thread);
1412
 
                iref->ir_block = block;
1413
 
                iref->ir_branch = (XTIdxBranchDPtr) block->cb_data;
1414
 
        }
1415
 
        else {
1416
 
                iref->ir_block = NULL;
1417
 
                iref->ir_branch = NULL;
1418
 
        }
1419
 
        iref->ir_xlock = TRUE;
1420
 
        iref->ir_updated = FALSE;
1421
 
 
1422
 
        return OK;
1423
 
}
1424
 
 
1425
 
/* 
1426
 
 * Note, this function may only be called if the block has
1427
 
 * been freed.
1428
 
 */
1429
 
xtPublic xtBool xt_ind_free_block(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address)
1430
 
{
1431
 
        XTIndBlockPtr   block;
1432
 
        DcSegmentPtr    seg;
1433
 
 
1434
 
        if (!ind_cac_get(ot, address, &seg, &block))
1435
 
                return FAILED;
1436
 
        if (block) {
1437
 
                XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
1438
 
 
1439
 
                if (block->cb_state == IDX_CAC_BLOCK_FLUSHING) {
1440
 
                        if (!ot->ot_table->tab_ind_flush_ilog->il_write_block(ot, block)) {
1441
 
                                XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
1442
 
                                IDX_CAC_UNLOCK(seg, ot->ot_thread);
1443
 
                                return FAILED;
1444
 
                        }
1445
 
                }
1446
 
 
1447
 
                /* {PAGE-NO-IN-INDEX-FILE}
1448
 
                 * This is the one exeption to the rule that a block
1449
 
                 * that is in the IDX_CAC_BLOCK_LOGGED may be released
1450
 
                 * from the cache!
1451
 
                 */
1452
 
                ASSERT_NS(IDX_CAC_MODIFYABLE(block->cb_state));
1453
 
 
1454
 
                if (block->cb_state == IDX_CAC_BLOCK_DIRTY) {
1455
 
                        /* Take the block off the dirty list: */
1456
 
                        xt_spinlock_lock(&ind->mi_dirty_lock);
1457
 
                        if (block->cb_dirty_next)
1458
 
                                block->cb_dirty_next->cb_dirty_prev = block->cb_dirty_prev;
1459
 
                        if (block->cb_dirty_prev)
1460
 
                                block->cb_dirty_prev->cb_dirty_next = block->cb_dirty_next;
1461
 
                        if (ind->mi_dirty_list == block)
1462
 
                                ind->mi_dirty_list = block->cb_dirty_next;
1463
 
                        ind->mi_dirty_blocks--;
1464
 
                        xt_spinlock_unlock(&ind->mi_dirty_lock);
1465
 
                        block->cb_state = IDX_CAC_BLOCK_CLEAN;
1466
 
                        ot->ot_thread->st_statistics.st_ind_cache_dirty--;
1467
 
#ifdef IND_OPT_DATA_WRITTEN
1468
 
                        block->cb_header = FALSE;
1469
 
                        block->cb_min_pos = 0xFFFF;
1470
 
                        block->cb_max_pos = 0;
1471
 
#endif
1472
 
                }
1473
 
                XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
1474
 
 
1475
 
                IDX_CAC_UNLOCK(seg, ot->ot_thread);
1476
 
        }
1477
 
 
1478
 
        return OK;
1479
 
}
1480
 
 
1481
 
xtPublic xtBool xt_ind_read_bytes(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, size_t size, xtWord1 *data)
1482
 
{
1483
 
        XTIndBlockPtr   block;
1484
 
        DcSegmentPtr    seg;
1485
 
 
1486
 
        if (!(block = ind_cac_fetch(ot, ind, address, &seg, TRUE)))
1487
 
                return FAILED;
1488
 
 
1489
 
        XT_IPAGE_READ_LOCK(&block->cb_lock);
1490
 
        memcpy(data, block->cb_data, size);
1491
 
        XT_IPAGE_UNLOCK(&block->cb_lock, FALSE);
1492
 
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
1493
 
        return OK;
1494
 
}
1495
 
 
1496
 
xtPublic xtBool xt_ind_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID address, XTPageLockType ltype, XTIndReferencePtr iref)
1497
 
{
1498
 
        register XTIndBlockPtr  block;
1499
 
        DcSegmentPtr                    seg;
1500
 
        xtWord2                                 branch_size;
1501
 
        u_int                                   rec_size;
1502
 
        xtBool                                  xlock = FALSE;
1503
 
 
1504
 
#ifdef DEBUG
1505
 
        ASSERT_NS(iref->ir_xlock == 2);
1506
 
        ASSERT_NS(iref->ir_xlock == 2);
1507
 
#endif
1508
 
        if (!(block = ind_cac_fetch(ot, ind, address, &seg, TRUE)))
1509
 
                return FAILED;
1510
 
 
1511
 
        branch_size = XT_GET_DISK_2(((XTIdxBranchDPtr) block->cb_data)->tb_size_2);
1512
 
        rec_size = XT_GET_INDEX_BLOCK_LEN(branch_size);
1513
 
        if (rec_size < 2 || rec_size > XT_INDEX_PAGE_SIZE)
1514
 
                goto failed_corrupt;
1515
 
        if (ind->mi_fix_key) {
1516
 
                rec_size -= 2;
1517
 
                if (XT_IS_NODE(branch_size)) {
1518
 
                        if (rec_size != 0) {
1519
 
                                if (rec_size < XT_NODE_REF_SIZE)
1520
 
                                        goto failed_corrupt;
1521
 
                                rec_size -= XT_NODE_REF_SIZE;
1522
 
                                if ((rec_size % (ind->mi_key_size + XT_RECORD_REF_SIZE + XT_NODE_REF_SIZE)) != 0)
1523
 
                                        goto failed_corrupt;
1524
 
                        }
1525
 
                }
1526
 
                else {
1527
 
                        if ((rec_size % (ind->mi_key_size + XT_RECORD_REF_SIZE)) != 0)
1528
 
                                goto failed_corrupt;
1529
 
                }
1530
 
        }
1531
 
 
1532
 
        switch (ltype) {
1533
 
                case XT_LOCK_READ:
1534
 
                        break;
1535
 
                case XT_LOCK_WRITE:
1536
 
                        xlock = TRUE;
1537
 
                        break;
1538
 
                case XT_XLOCK_LEAF:
1539
 
                        if (!XT_IS_NODE(branch_size))
1540
 
                                xlock = TRUE;
1541
 
                        break;
1542
 
                case XT_XLOCK_DEL_LEAF:
1543
 
                        if (!XT_IS_NODE(branch_size)) {
1544
 
                                if (ot->ot_table->tab_dic.dic_no_lazy_delete)
1545
 
                                        xlock = TRUE;
1546
 
                                else {
1547
 
                                        /*
1548
 
                                         * {LAZY-DEL-INDEX-ITEMS}
1549
 
                                         *
1550
 
                                         * We are fetch a page for delete purpose.
1551
 
                                         * we decide here if we plan to do a lazy delete,
1552
 
                                         * Or if we plan to compact the node.
1553
 
                                         *
1554
 
                                         * A lazy delete just requires a shared lock.
1555
 
                                         *
1556
 
                                         */
1557
 
                                        if (ind->mi_lazy_delete) {
1558
 
                                                /* If the number of deleted items is greater than
1559
 
                                                 * half of the number of times that can fit in the
1560
 
                                                 * page, then we will compact the node.
1561
 
                                                 */
1562
 
                                                if (!xt_idx_lazy_delete_on_leaf(ind, block, XT_GET_INDEX_BLOCK_LEN(branch_size)))
1563
 
                                                        xlock = TRUE;
1564
 
                                        }
1565
 
                                        else
1566
 
                                                xlock = TRUE;
1567
 
                                }
1568
 
                        }
1569
 
                        break;
1570
 
        }
1571
 
 
1572
 
        if ((iref->ir_xlock = xlock))
1573
 
                XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
1574
 
        else
1575
 
                XT_IPAGE_READ_LOCK(&block->cb_lock);
1576
 
 
1577
 
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
1578
 
 
1579
 
        /* {DIRECT-IO}
1580
 
         * Direct I/O requires that the buffer is 512 byte aligned.
1581
 
         * To do this, cb_data is turned into a pointer, instead
1582
 
         * of an array.
1583
 
         * As a result, we need to pass a pointer to both the
1584
 
         * cache block and the cache block data:
1585
 
         */
1586
 
        iref->ir_updated = FALSE;
1587
 
        iref->ir_block = block;
1588
 
        iref->ir_branch = (XTIdxBranchDPtr) block->cb_data;
1589
 
        return OK;
1590
 
 
1591
 
        failed_corrupt:
1592
 
        IDX_CAC_UNLOCK(seg, ot->ot_thread);
1593
 
        xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
1594
 
        return FAILED;
1595
 
}
1596
 
 
1597
 
xtPublic xtBool xt_ind_release(XTOpenTablePtr ot, XTIndexPtr ind, XTPageUnlockType XT_NDEBUG_UNUSED(utype), XTIndReferencePtr iref)
1598
 
{
1599
 
        register XTIndBlockPtr  block;
1600
 
 
1601
 
        block = iref->ir_block;
1602
 
 
1603
 
#ifdef DEBUG
1604
 
        ASSERT_NS(iref->ir_xlock != 2);
1605
 
        ASSERT_NS(iref->ir_updated != 2);
1606
 
        if (iref->ir_updated)
1607
 
                ASSERT_NS(utype == XT_UNLOCK_R_UPDATE || utype == XT_UNLOCK_W_UPDATE);
1608
 
        else
1609
 
                ASSERT_NS(utype == XT_UNLOCK_READ || utype == XT_UNLOCK_WRITE);
1610
 
        if (iref->ir_xlock)
1611
 
                ASSERT_NS(utype == XT_UNLOCK_WRITE || utype == XT_UNLOCK_W_UPDATE);
1612
 
        else
1613
 
                ASSERT_NS(utype == XT_UNLOCK_READ || utype == XT_UNLOCK_R_UPDATE);
1614
 
#endif
1615
 
        if (iref->ir_updated) {
1616
 
#ifdef DEBUG
1617
 
#ifdef IND_OPT_DATA_WRITTEN
1618
 
                xtWord2 branch_size;
1619
 
                u_int   rec_size;
1620
 
 
1621
 
                branch_size = XT_GET_DISK_2(((XTIdxBranchDPtr) block->cb_data)->tb_size_2);
1622
 
                rec_size = XT_GET_INDEX_BLOCK_LEN(branch_size);
1623
 
 
1624
 
                ASSERT_NS(block->cb_min_pos <= rec_size-2);
1625
 
                ASSERT_NS(block->cb_min_pos <= block->cb_max_pos);
1626
 
                ASSERT_NS(block->cb_max_pos <= rec_size-2);
1627
 
                ASSERT_NS(block->cb_max_pos <= XT_INDEX_PAGE_SIZE-2);
1628
 
#endif
1629
 
#endif
1630
 
                /* The page was update: */
1631
 
                ASSERT_NS(IDX_CAC_MODIFYABLE(block->cb_state));
1632
 
                if (block->cb_state != IDX_CAC_BLOCK_DIRTY) {
1633
 
                        TRACK_BLOCK_WRITE(offset);
1634
 
                        xt_spinlock_lock(&ind->mi_dirty_lock);
1635
 
                        if ((block->cb_dirty_next = ind->mi_dirty_list))
1636
 
                                ind->mi_dirty_list->cb_dirty_prev = block;
1637
 
                        block->cb_dirty_prev = NULL;
1638
 
                        ind->mi_dirty_list = block;
1639
 
                        ind->mi_dirty_blocks++;
1640
 
                        xt_spinlock_unlock(&ind->mi_dirty_lock);
1641
 
                        if (block->cb_state != IDX_CAC_BLOCK_LOGGED) {
1642
 
                                ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_CLEAN);
1643
 
                                ot->ot_thread->st_statistics.st_ind_cache_dirty++;
1644
 
                        }
1645
 
                        block->cb_state = IDX_CAC_BLOCK_DIRTY;
1646
 
                }
1647
 
        }
1648
 
 
1649
 
        XT_IPAGE_UNLOCK(&block->cb_lock, iref->ir_xlock);
1650
 
#ifdef DEBUG
1651
 
        iref->ir_xlock = 2;
1652
 
        iref->ir_updated = 2;
1653
 
#endif
1654
 
        return OK;
1655
 
}
1656
 
 
1657
 
xtPublic xtBool xt_ind_reserve(XTOpenTablePtr ot, u_int count, XTIdxBranchDPtr not_this)
1658
 
{
1659
 
        register XTIndBlockPtr  block;
1660
 
        register DcGlobalsRec   *dcg = &ind_cac_globals;
1661
 
 
1662
 
#ifdef XT_TRACK_INDEX_UPDATES
1663
 
        ot->ot_ind_reserved = count;
1664
 
        ot->ot_ind_reads = 0;
1665
 
#endif
1666
 
#ifdef DEBUG_CHECK_IND_CACHE
1667
 
        xt_ind_check_cache(NULL);
1668
 
#endif
1669
 
        while (ot->ot_ind_res_count < count) {
1670
 
                if (!dcg->cg_free_list) {
1671
 
                        if (!ind_cac_free_lru_blocks(ot, count - ot->ot_ind_res_count, not_this)) {
1672
 
                                if (!dcg->cg_free_list) {
1673
 
                                        xt_ind_free_reserved(ot);
1674
 
                                        xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_INDEX_CACHE);
1675
 
#ifdef DEBUG_CHECK_IND_CACHE
1676
 
                                        xt_ind_check_cache(NULL);
1677
 
#endif
1678
 
                                        return FAILED;
1679
 
                                }
1680
 
                        }
1681
 
                }
1682
 
 
1683
 
                /* Get a free block: */
1684
 
                xt_lock_mutex_ns(&dcg->cg_lock);
1685
 
                while (ot->ot_ind_res_count < count && (block = dcg->cg_free_list)) {
1686
 
                        ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_FREE);
1687
 
                        dcg->cg_free_list = block->cb_next;
1688
 
                        dcg->cg_free_count--;
1689
 
                        block->cb_next = ot->ot_ind_res_bufs;
1690
 
                        ot->ot_ind_res_bufs = block;
1691
 
                        ot->ot_ind_res_count++;
1692
 
#ifdef DEBUG_CHECK_IND_CACHE
1693
 
                        dcg->cg_reserved_by_ots++;
1694
 
#endif
1695
 
                }
1696
 
                xt_unlock_mutex_ns(&dcg->cg_lock);
1697
 
        }
1698
 
#ifdef DEBUG_CHECK_IND_CACHE
1699
 
        xt_ind_check_cache(NULL);
1700
 
#endif
1701
 
        return OK;
1702
 
}
1703
 
 
1704
 
xtPublic void xt_ind_free_reserved(XTOpenTablePtr ot)
1705
 
{
1706
 
#ifdef DEBUG_CHECK_IND_CACHE
1707
 
        xt_ind_check_cache(NULL);
1708
 
#endif
1709
 
        if (ot->ot_ind_res_bufs) {
1710
 
                register XTIndBlockPtr  block, fblock;
1711
 
                register DcGlobalsRec   *dcg = &ind_cac_globals;
1712
 
 
1713
 
                xt_lock_mutex_ns(&dcg->cg_lock);
1714
 
                block = ot->ot_ind_res_bufs;
1715
 
                while (block) {
1716
 
                        fblock = block;
1717
 
                        block = block->cb_next;
1718
 
 
1719
 
                        fblock->cb_next = dcg->cg_free_list;
1720
 
                        dcg->cg_free_list = fblock;
1721
 
#ifdef DEBUG_CHECK_IND_CACHE
1722
 
                        dcg->cg_reserved_by_ots--;
1723
 
#endif
1724
 
                        dcg->cg_free_count++;
1725
 
                }
1726
 
                xt_unlock_mutex_ns(&dcg->cg_lock);
1727
 
                ot->ot_ind_res_bufs = NULL;
1728
 
                ot->ot_ind_res_count = 0;
1729
 
        }
1730
 
#ifdef DEBUG_CHECK_IND_CACHE
1731
 
        xt_ind_check_cache(NULL);
1732
 
#endif
1733
 
}
1734
 
 
1735
 
xtPublic void xt_ind_unreserve(XTOpenTablePtr ot)
1736
 
{
1737
 
        if (!ind_cac_globals.cg_free_list)
1738
 
                xt_ind_free_reserved(ot);
1739
 
}
1740