~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/tabcache_xt.cc

  • Committer: Stewart Smith
  • Date: 2008-11-21 16:06:07 UTC
  • mto: This revision was merged to the branch mainline in revision 593.
  • Revision ID: stewart@flamingspork.com-20081121160607-n6gdlt013spuo54r
remove mysql_frm_type
and fix engines to return correct value from delete_table when table doesn't exist.
(it should be ENOENT).

Also fix up some tests that manipulated frm files by hand. These tests are no longer valid and will need to be rewritten in the not too distant future.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2007 PrimeBase Technologies GmbH
2
 
 *
3
 
 * PrimeBase XT
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
 
 *
19
 
 * 2007-10-30   Paul McCullagh
20
 
 *
21
 
 * H&G2JCtL
22
 
 *
23
 
 * The new table cache. Caches all non-index data. This includes the data
24
 
 * files and the row pointer files.
25
 
 */
26
 
 
27
 
#include "xt_config.h"
28
 
 
29
 
#ifdef DRIZZLED
30
 
#include <bitset>
31
 
#endif
32
 
 
33
 
#include <signal.h>
34
 
 
35
 
#include "pthread_xt.h"
36
 
#include "tabcache_xt.h"
37
 
#include "table_xt.h"
38
 
#include "database_xt.h"
39
 
#include "trace_xt.h"
40
 
#include "myxt_xt.h"
41
 
#include "lock_xt.h"
42
 
#include "strutil_xt.h"
43
 
 
44
 
#ifdef DEBUG
45
 
//#define TRACE_DISTRIBUTION
46
 
#endif
47
 
 
48
 
#ifdef TRACE_DISTRIBUTION
49
 
static void tabc_init_dist(XTTabCachePtr cac, XTFilePtr file);
50
 
static void tabc_dist_change(XTOpenFilePtr file, xtRefID ref_id);
51
 
#endif
52
 
 
53
 
xtPublic XTTabCacheMemRec       xt_tab_cache;
54
 
 
55
 
static void tabc_fr_wait_for_cache(XTThreadPtr self, u_int msecs);
56
 
 
57
 
xtPublic void xt_tc_set_cache_size(size_t cache_size)
58
 
{
59
 
        xt_tab_cache.tcm_cache_size = cache_size;
60
 
        /* Multiplying by this number can overflow a 4 byte value! */
61
 
        xt_tab_cache.tcm_low_level = (size_t) ((xtWord8) cache_size * (xtWord8) 70 / (xtWord8) 100);    // Current 70%
62
 
        xt_tab_cache.tcm_high_level = (size_t) ((xtWord8) cache_size * 95 / (xtWord8) 100);                             // Current 95%
63
 
        xt_tab_cache.tcm_mid_level = (size_t) ((xtWord8) cache_size * 85 / (xtWord8) 100);                              // Current 85%
64
 
}
65
 
 
66
 
/*
67
 
 * Initialize the disk cache.
68
 
 */
69
 
xtPublic void xt_tc_init(XTThreadPtr self, size_t cache_size)
70
 
{
71
 
        xt_tc_set_cache_size(cache_size);
72
 
 
73
 
        xt_tab_cache.tcm_approx_page_count = cache_size / sizeof(XTTabCachePageRec);
74
 
        /* Determine the size of the hash table.
75
 
         * The size is set to 2* the number of pages!
76
 
         */
77
 
        xt_tab_cache.tcm_hash_size = (xt_tab_cache.tcm_approx_page_count * 2) / XT_TC_SEGMENT_COUNT;
78
 
 
79
 
        try_(a) {
80
 
                for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) {
81
 
                        xt_tab_cache.tcm_segment[i].tcs_cache_in_use = 0;
82
 
                        xt_tab_cache.tcm_segment[i].tcs_hash_table = (XTTabCachePagePtr *) xt_calloc(self, xt_tab_cache.tcm_hash_size * sizeof(XTTabCachePagePtr));
83
 
                        TAB_CAC_INIT_LOCK(self, &xt_tab_cache.tcm_segment[i].tcs_lock);
84
 
                }
85
 
 
86
 
                xt_init_mutex_with_autoname(self, &xt_tab_cache.tcm_lock);
87
 
                xt_init_cond(self, &xt_tab_cache.tcm_cond);
88
 
                xt_init_mutex_with_autoname(self, &xt_tab_cache.tcm_freeer_lock);
89
 
                xt_init_cond(self, &xt_tab_cache.tcm_freeer_cond);
90
 
        }
91
 
        catch_(a) {
92
 
                xt_tc_exit(self);
93
 
                throw_();
94
 
        }
95
 
        cont_(a);
96
 
}
97
 
 
98
 
xtPublic void xt_tc_exit(XTThreadPtr self)
99
 
{
100
 
        XTTabCacheSegPtr seg;
101
 
 
102
 
        for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) {
103
 
                seg = &xt_tab_cache.tcm_segment[i];
104
 
                if (seg->tcs_hash_table) {
105
 
                        XTTabCachePagePtr page, tmp_page;
106
 
 
107
 
                        for (size_t j=0; j<xt_tab_cache.tcm_hash_size; j++) {
108
 
                                page = seg->tcs_hash_table[j];
109
 
                                while (page) {
110
 
                                        tmp_page = page;
111
 
                                        page = page->tcp_next;
112
 
                                        ASSERT_NS(seg->tcs_cache_in_use >= offsetof(XTTabCachePageRec, tcp_data) + tmp_page->tcp_data_size);
113
 
                                        seg->tcs_cache_in_use -= (offsetof(XTTabCachePageRec, tcp_data) + tmp_page->tcp_data_size);
114
 
                                        ASSERT_NS(seg->tcs_cache_in_use == 0 || seg->tcs_cache_in_use >= 25000);
115
 
                                        xt_free(self, tmp_page);
116
 
                                }
117
 
                        }
118
 
 
119
 
#ifdef CHECK_DOUBLE_READ
120
 
                        printf("reads = %d, not req. = %d %% unrequired = %.2f\n", seg->tcs_total_reads, seg->tcs_read_not_req, 
121
 
                                (double) seg->tcs_read_not_req / (double) seg->tcs_total_reads);
122
 
#endif
123
 
                        xt_free(self, seg->tcs_hash_table);
124
 
                        seg->tcs_hash_table = NULL;
125
 
                        TAB_CAC_FREE_LOCK(self, &seg->tcs_lock);
126
 
                }
127
 
                ASSERT_NS(seg->tcs_cache_in_use == 0);
128
 
        }
129
 
 
130
 
        xt_free_mutex(&xt_tab_cache.tcm_lock);
131
 
        xt_free_cond(&xt_tab_cache.tcm_cond);
132
 
        xt_free_mutex(&xt_tab_cache.tcm_freeer_lock);
133
 
        xt_free_cond(&xt_tab_cache.tcm_freeer_cond);
134
 
}
135
 
 
136
 
xtPublic xtInt8 xt_tc_get_usage()
137
 
{
138
 
        xtInt8 size = 0;
139
 
 
140
 
        for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) {
141
 
                size += xt_tab_cache.tcm_segment[i].tcs_cache_in_use;
142
 
        }
143
 
        return size;
144
 
}
145
 
 
146
 
xtPublic xtInt8 xt_tc_get_size()
147
 
{
148
 
        return (xtInt8) xt_tab_cache.tcm_cache_size;
149
 
}
150
 
 
151
 
xtPublic xtInt8 xt_tc_get_high()
152
 
{
153
 
        return (xtInt8) xt_tab_cache.tcm_cache_high;
154
 
}
155
 
 
156
 
#ifdef DEBUG
157
 
xtPublic void xt_check_table_cache(XTTableHPtr tab)
158
 
{
159
 
        XTTabCachePagePtr page, ppage;
160
 
 
161
 
        xt_lock_mutex_ns(&xt_tab_cache.tcm_lock);
162
 
        ppage = NULL;
163
 
        page = xt_tab_cache.tcm_lru_page;
164
 
        while (page) {
165
 
                if (tab) {
166
 
                        if (page->tcp_db_id == tab->tab_db->db_id && page->tcp_tab_id == tab->tab_id) {
167
 
                                ASSERT_NS(!XTTableSeq::xt_op_is_before(tab->tab_seq.ts_next_seq, page->tcp_op_seq));
168
 
                        }
169
 
                }
170
 
                ASSERT_NS(page->tcp_lr_used == ppage);
171
 
                ppage = page;
172
 
                page = page->tcp_mr_used;
173
 
        }
174
 
        ASSERT_NS(xt_tab_cache.tcm_mru_page == ppage);
175
 
        xt_unlock_mutex_ns(&xt_tab_cache.tcm_lock);
176
 
}
177
 
#endif
178
 
 
179
 
void XTTabCache::xt_tc_setup(XTTableHPtr tab, xtBool rec_file, size_t head_size, size_t rec_size)
180
 
{
181
 
        tci_table = tab;
182
 
        tci_rec_file = rec_file;
183
 
        tci_header_size = head_size;
184
 
        tci_rec_size = rec_size;
185
 
        tci_rows_per_page = (XT_TC_PAGE_SIZE / rec_size) + 1;
186
 
        if (tci_rows_per_page < 2)
187
 
                tci_rows_per_page = 2;
188
 
        tci_page_size = tci_rows_per_page * rec_size;
189
 
 
190
 
#ifdef TRACE_DISTRIBUTION
191
 
        tabc_init_dist(this, tab->tab_rec_file);
192
 
#endif
193
 
}
194
 
 
195
 
/*
196
 
 * This function assumes that we never write past the boundary of a page.
197
 
 * This should be the case, because we should never write more than
198
 
 * a row, and there are only whole rows on a page.
199
 
 */
200
 
xtBool XTTabCache::xt_tc_write(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t inc, size_t size, xtWord1 *data, xtOpSeqNo *op_seq, xtBool read, XTThreadPtr thread)
201
 
{
202
 
        size_t                          offset;
203
 
        XTTabCachePagePtr       page;
204
 
        XTTabCacheSegPtr        seg;
205
 
 
206
 
#ifdef TRACE_DISTRIBUTION
207
 
        tabc_dist_change(file, ref_id);
208
 
#endif
209
 
        /*
210
 
        retry:
211
 
        */
212
 
        if (!tc_fetch(file, ref_id, &seg, &page, &offset, read, thread))
213
 
                return FAILED;
214
 
        /* Don't write while there is a read lock on the page,
215
 
         * which can happen during a sequential scan...
216
 
         *
217
 
         * This will have to be OK.
218
 
         * I cannot wait for the lock because a thread locks
219
 
         * itself out when updating during a sequential scan.
220
 
         *
221
 
         * However, I don't think this is a problem, because
222
 
         * the only records that are changed, are records
223
 
         * containing uncommitted data. Such records should
224
 
         * be ignored by a sequential scan. As long as
225
 
         * we don't crash due to reading half written
226
 
         * data!
227
 
         *
228
 
        if (page->tcp_lock_count) {
229
 
                if (!xt_timed_wait_cond_ns(&seg->tcs_cond, &seg->tcs_lock, 100)) {
230
 
                        xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id);
231
 
                        return FAILED;
232
 
                }
233
 
                xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id);
234
 
                // The page may have dissappeared from the cache, while we were sleeping!
235
 
                goto retry;
236
 
        }
237
 
        */
238
 
        
239
 
        ASSERT_NS(offset + inc + 4 <= tci_page_size);
240
 
        memcpy(page->tcp_data + offset + inc, data, size);
241
 
        /* GOTCHA, this was "op_seq > page->tcp_op_seq", however
242
 
         * this does not handle overflow!
243
 
        if (XTTableSeq::xt_op_is_before(page->tcp_op_seq, op_seq))
244
 
                page->tcp_op_seq = op_seq;
245
 
         */
246
 
 
247
 
        page->tcp_dirty = TRUE;
248
 
        ASSERT_NS(page->tcp_db_id == tci_table->tab_db->db_id && page->tcp_tab_id == tci_table->tab_id);
249
 
        *op_seq = tci_table->tab_seq.ts_set_op_seq(page);
250
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
251
 
        return OK;
252
 
}
253
 
 
254
 
/*
255
 
 * This is a special version of write which is used to set the "clean" bit.
256
 
 * The alternative would be to read the record first, but this
257
 
 * is much quicker!
258
 
 *
259
 
 * This function also checks if xn_id, row_id and other data match (the checks 
260
 
 * are similar to xn_sw_cleanup_done) before modifying the record, otherwise it 
261
 
 * assumes that the record was already updated earlier and we must not set it to 
262
 
 * clean.
263
 
 *
264
 
 * If the record was not modified the function returns FALSE.
265
 
 *
266
 
 * The function has a self pointer and can throw an exception.
267
 
 */
268
 
xtBool XTTabCache::xt_tc_write_cond(XTThreadPtr self, XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord1 new_type, xtOpSeqNo *op_seq, 
269
 
        xtXactID xn_id, xtRowID row_id, u_int stat_id, u_int rec_type)
270
 
{
271
 
        size_t                          offset;
272
 
        XTTabCachePagePtr       page;
273
 
        XTTabCacheSegPtr        seg;
274
 
        XTTabRecHeadDPtr        rec_head;
275
 
 
276
 
#ifdef TRACE_DISTRIBUTION
277
 
        tabc_dist_change(file, ref_id);
278
 
#endif
279
 
        if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, self))
280
 
                xt_throw(self);
281
 
 
282
 
        ASSERT(offset + 1 <= tci_page_size);
283
 
 
284
 
        rec_head = (XTTabRecHeadDPtr)(page->tcp_data + offset);
285
 
 
286
 
        /* Transaction must match: */
287
 
        if (XT_GET_DISK_4(rec_head->tr_xact_id_4) != xn_id)
288
 
                goto no_change;
289
 
 
290
 
        /* Record header must match expected value from
291
 
         * log or clean has been done, or is not required.
292
 
         *
293
 
         * For example, it is not required if a record
294
 
         * has been overwritten in a transaction.
295
 
         */
296
 
        if (rec_head->tr_rec_type_1 != rec_type ||
297
 
                rec_head->tr_stat_id_1 != stat_id)
298
 
                goto no_change;
299
 
 
300
 
        /* Row must match: */
301
 
        if (XT_GET_DISK_4(rec_head->tr_row_id_4) != row_id)
302
 
                goto no_change;
303
 
 
304
 
        *(page->tcp_data + offset) = new_type;
305
 
 
306
 
        page->tcp_dirty = TRUE;
307
 
        ASSERT(page->tcp_db_id == tci_table->tab_db->db_id && page->tcp_tab_id == tci_table->tab_id);
308
 
        *op_seq = tci_table->tab_seq.ts_set_op_seq(page);
309
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
310
 
        return TRUE;
311
 
 
312
 
        no_change:
313
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
314
 
        return FALSE;
315
 
}
316
 
 
317
 
xtBool XTTabCache::xt_tc_read(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t size, xtWord1 *data, XTThreadPtr thread)
318
 
{
319
 
#ifdef XT_USE_ROW_REC_MMAP_FILES
320
 
        return tc_read_direct(file, ref_id, size, data, thread);
321
 
#else
322
 
        size_t                          offset;
323
 
        XTTabCachePagePtr       page;
324
 
        XTTabCacheSegPtr        seg;
325
 
 
326
 
        if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, thread))
327
 
                return FAILED;
328
 
        /* A read must be completely on a page: */
329
 
        ASSERT_NS(offset + size <= tci_page_size);
330
 
        memcpy(data, page->tcp_data + offset, size);
331
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
332
 
        return OK;
333
 
#endif
334
 
}
335
 
 
336
 
xtBool XTTabCache::xt_tc_read_4(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord4 *value, XTThreadPtr thread)
337
 
{
338
 
#ifdef XT_USE_ROW_REC_MMAP_FILES
339
 
        register u_int                          page_idx;
340
 
        register XTTabCachePagePtr      page;
341
 
        register XTTabCacheSegPtr       seg;
342
 
        register u_int                          hash_idx;
343
 
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
344
 
        off_t                                           address;
345
 
 
346
 
        ASSERT_NS(ref_id);
347
 
        ref_id--;
348
 
        page_idx = ref_id / this->tci_rows_per_page;
349
 
        address = (off_t) ref_id * (off_t) this->tci_rec_size + (off_t) this->tci_header_size;
350
 
 
351
 
        hash_idx = page_idx + (file->fr_id * 223);
352
 
        seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
353
 
        hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
354
 
 
355
 
        TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id);
356
 
        page = seg->tcs_hash_table[hash_idx];
357
 
        while (page) {
358
 
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
359
 
                        size_t  offset;
360
 
                        xtWord1 *buffer;
361
 
 
362
 
                        offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
363
 
                        ASSERT_NS(offset + 4 <= this->tci_page_size);
364
 
                        buffer = page->tcp_data + offset;
365
 
                        *value = XT_GET_DISK_4(buffer);
366
 
                        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
367
 
                        return OK;
368
 
                }
369
 
                page = page->tcp_next;
370
 
        }
371
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
372
 
 
373
 
        return xt_pread_file_4(file, address, value, &thread->st_statistics.st_rec, thread);
374
 
#else
375
 
        size_t                          offset;
376
 
        XTTabCachePagePtr       page;
377
 
        XTTabCacheSegPtr        seg;
378
 
        xtWord1                         *data;
379
 
 
380
 
        if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, thread))
381
 
                return FAILED;
382
 
        /* A read must be completely on a page: */
383
 
        ASSERT_NS(offset + 4 <= tci_page_size);
384
 
        data = page->tcp_data + offset;
385
 
        *value = XT_GET_DISK_4(data);
386
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
387
 
        return OK;
388
 
#endif
389
 
}
390
 
 
391
 
xtBool XTTabCache::xt_tc_get_page(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtBool load, XTTabCachePagePtr *ret_page, size_t *offset, XTThreadPtr thread)
392
 
{
393
 
        XTTabCachePagePtr       page;
394
 
        XTTabCacheSegPtr        seg;
395
 
 
396
 
        if (load) {
397
 
                if (!tc_fetch(file, ref_id, &seg, &page, offset, TRUE, thread))
398
 
                        return FAILED;
399
 
        }
400
 
        else {
401
 
                if (!tc_fetch_direct(file, ref_id, &seg, &page, offset, thread))
402
 
                        return FAILED;
403
 
                if (!seg) {
404
 
                        *ret_page = NULL;
405
 
                        return OK;
406
 
                }
407
 
        }
408
 
        page->tcp_lock_count++;
409
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
410
 
        *ret_page = page;
411
 
        return OK;
412
 
}
413
 
 
414
 
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
415
 
void XTTabCache::xt_tc_release_page(XT_ROW_REC_FILE_PTR XT_UNUSED(file), XTTabCachePagePtr page, XTThreadPtr thread __attribute__((unused)))
416
 
{
417
 
        XTTabCacheSegPtr        seg;
418
 
 
419
 
        seg = &xt_tab_cache.tcm_segment[page->tcp_seg];
420
 
        TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id);
421
 
 
422
 
#ifdef DEBUG
423
 
        XTTabCachePagePtr lpage, ppage;
424
 
 
425
 
        ppage = NULL;
426
 
        lpage = seg->tcs_hash_table[page->tcp_hash_idx];
427
 
        while (lpage) {
428
 
                if (lpage->tcp_page_idx == page->tcp_page_idx &&
429
 
                        lpage->tcp_file_id == page->tcp_file_id)
430
 
                        break;
431
 
                ppage = lpage;
432
 
                lpage = lpage->tcp_next;
433
 
        }
434
 
 
435
 
        ASSERT_NS(page == lpage);
436
 
        ASSERT_NS(page->tcp_lock_count > 0);
437
 
#endif
438
 
 
439
 
        if (page->tcp_lock_count > 0)
440
 
                page->tcp_lock_count--;
441
 
 
442
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
443
 
}
444
 
 
445
 
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
446
 
xtBool XTTabCache::xt_tc_lock_page(XT_ROW_REC_FILE_PTR file, XTTabCachePagePtr *ret_page, xtRefID ref_id, size_t *offset, XTThreadPtr thread __attribute__((unused)))
447
 
{
448
 
        XTTabCachePagePtr       page;
449
 
        XTTabCacheSegPtr        seg;
450
 
 
451
 
#ifdef TRACE_DISTRIBUTION
452
 
        tabc_dist_change(file, ref_id);
453
 
#endif
454
 
        if (!tc_fetch(file, ref_id, &seg, &page, offset, TRUE, thread))
455
 
                return FAILED;
456
 
        *ret_page = page;
457
 
        return OK;
458
 
}
459
 
 
460
 
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
461
 
void XTTabCache::xt_tc_unlock_page(XT_ROW_REC_FILE_PTR XT_UNUSED(file), XTTabCachePagePtr page, xtOpSeqNo *op_seq, XTThreadPtr thread __attribute__((unused)))
462
 
{
463
 
        XTTabCacheSegPtr        seg;
464
 
 
465
 
        seg = &xt_tab_cache.tcm_segment[page->tcp_seg];
466
 
        page->tcp_dirty = TRUE;
467
 
        *op_seq = tci_table->tab_seq.ts_set_op_seq(page);
468
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
469
 
}
470
 
 
471
 
xtBool XTTabCache::xt_tc_read_page(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord1 *data, XTThreadPtr thread)
472
 
{
473
 
        return tc_read_direct(file, ref_id, this->tci_page_size, data, thread);
474
 
}
475
 
 
476
 
/* Read row and record files directly.
477
 
 * This by-passed the cache when reading, which mean
478
 
 * we rely in the OS for caching.
479
 
 * This probably only makes sense when these files
480
 
 * are memory mapped.
481
 
 */
482
 
xtBool XTTabCache::tc_read_direct(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t size, xtWord1 *data, XTThreadPtr thread)
483
 
{
484
 
        register u_int                          page_idx;
485
 
        register XTTabCachePagePtr      page;
486
 
        register XTTabCacheSegPtr       seg;
487
 
        register u_int                          hash_idx;
488
 
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
489
 
        size_t                                          red_size;
490
 
        off_t                                           address;
491
 
 
492
 
        ASSERT_NS(ref_id);
493
 
        ref_id--;
494
 
        page_idx = ref_id / this->tci_rows_per_page;
495
 
        address = (off_t) ref_id * (off_t) this->tci_rec_size + (off_t) this->tci_header_size;
496
 
 
497
 
        hash_idx = page_idx + (file->fr_id * 223);
498
 
        seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
499
 
        hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
500
 
 
501
 
        TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id);
502
 
        page = seg->tcs_hash_table[hash_idx];
503
 
        while (page) {
504
 
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
505
 
                        size_t offset;
506
 
 
507
 
                        offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
508
 
                        ASSERT_NS(offset + size <= this->tci_page_size);
509
 
                        memcpy(data, page->tcp_data + offset, size);
510
 
                        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
511
 
                        return OK;
512
 
                }
513
 
                page = page->tcp_next;
514
 
        }
515
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
516
 
        if (!XT_PREAD_RR_FILE(file, address, size, 0, data, &red_size, &thread->st_statistics.st_rec, thread))
517
 
                return FAILED;
518
 
        memset(data + red_size, 0, size - red_size);
519
 
        return OK;
520
 
}
521
 
 
522
 
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
523
 
xtBool XTTabCache::tc_fetch_direct(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCacheSegPtr *ret_seg, XTTabCachePagePtr *ret_page, size_t *offset, XTThreadPtr thread  __attribute__((unused)))
524
 
{
525
 
        register u_int                          page_idx;
526
 
        register XTTabCachePagePtr      page;
527
 
        register XTTabCacheSegPtr       seg;
528
 
        register u_int                          hash_idx;
529
 
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
530
 
 
531
 
        ASSERT_NS(ref_id);
532
 
        ref_id--;
533
 
        page_idx = ref_id / this->tci_rows_per_page;
534
 
        *offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
535
 
 
536
 
        hash_idx = page_idx + (file->fr_id * 223);
537
 
        seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
538
 
        hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
539
 
 
540
 
        TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id);
541
 
        page = seg->tcs_hash_table[hash_idx];
542
 
        while (page) {
543
 
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
544
 
                        *ret_seg = seg;
545
 
                        *ret_page = page;
546
 
                        return OK;
547
 
                }
548
 
                page = page->tcp_next;
549
 
        }
550
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
551
 
        *ret_seg = NULL;
552
 
        *ret_page = NULL;
553
 
        return OK;
554
 
}
555
 
 
556
 
/*
557
 
 * Note, this function may return an exclusive, or a shared lock.
558
 
 * If the page is in cache it will return a shared lock of the segment.
559
 
 * If the page was just added to the cache it will return an
560
 
 * exclusive lock.
561
 
 */
562
 
xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCacheSegPtr *ret_seg, XTTabCachePagePtr *ret_page, size_t *offset, xtBool read, XTThreadPtr thread)
563
 
{
564
 
        register u_int                          page_idx;
565
 
        register XTTabCachePagePtr      page, new_page;
566
 
        register XTTabCacheSegPtr       seg;
567
 
        register u_int                          hash_idx;
568
 
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
569
 
        size_t                                          red_size;
570
 
        off_t                                           address;
571
 
 
572
 
        ASSERT_NS(ref_id);
573
 
        ref_id--;
574
 
        page_idx = ref_id / this->tci_rows_per_page;
575
 
        address = (off_t) page_idx * (off_t) this->tci_page_size + (off_t) this->tci_header_size;
576
 
        *offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
577
 
 
578
 
        hash_idx = page_idx + (file->fr_id * 223);
579
 
        seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
580
 
        hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
581
 
 
582
 
        TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id);
583
 
        page = seg->tcs_hash_table[hash_idx];
584
 
        while (page) {
585
 
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
586
 
                        /* This page has been most recently used: */
587
 
                        if (XT_TIME_DIFF(page->tcp_ru_time, dcg->tcm_ru_now) > (dcg->tcm_approx_page_count >> 1)) {
588
 
                                /* Move to the front of the MRU list: */
589
 
                                xt_lock_mutex_ns(&dcg->tcm_lock);
590
 
 
591
 
                                page->tcp_ru_time = ++dcg->tcm_ru_now;
592
 
                                if (dcg->tcm_mru_page != page) {
593
 
                                        /* Remove from the MRU list: */
594
 
                                        if (dcg->tcm_lru_page == page)
595
 
                                                dcg->tcm_lru_page = page->tcp_mr_used;
596
 
                                        if (page->tcp_lr_used)
597
 
                                                page->tcp_lr_used->tcp_mr_used = page->tcp_mr_used;
598
 
                                        if (page->tcp_mr_used)
599
 
                                                page->tcp_mr_used->tcp_lr_used = page->tcp_lr_used;
600
 
        
601
 
                                        /* Make the page the most recently used: */
602
 
                                        if ((page->tcp_lr_used = dcg->tcm_mru_page))
603
 
                                                dcg->tcm_mru_page->tcp_mr_used = page;
604
 
                                        page->tcp_mr_used = NULL;
605
 
                                        dcg->tcm_mru_page = page;
606
 
                                        if (!dcg->tcm_lru_page)
607
 
                                                dcg->tcm_lru_page = page;
608
 
                                }
609
 
                                xt_unlock_mutex_ns(&dcg->tcm_lock);
610
 
                        }
611
 
                        *ret_seg = seg;
612
 
                        *ret_page = page;
613
 
                        thread->st_statistics.st_rec_cache_hit++;
614
 
                        return OK;
615
 
                }
616
 
                page = page->tcp_next;
617
 
        }
618
 
        
619
 
        size_t page_size = offsetof(XTTabCachePageRec, tcp_data) + this->tci_page_size;
620
 
 
621
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
622
 
        
623
 
        /* Page not found, allocate a new page: */
624
 
        if (!(new_page = (XTTabCachePagePtr) xt_malloc_ns(page_size)))
625
 
                return FAILED;
626
 
 
627
 
        /* Check the level of the cache: */
628
 
        size_t cache_used = 0;
629
 
        for (int i=0; i<XT_TC_SEGMENT_COUNT; i++)
630
 
                cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
631
 
 
632
 
        if (cache_used + page_size > dcg->tcm_cache_high)
633
 
                dcg->tcm_cache_high = cache_used;
634
 
 
635
 
        if (cache_used + page_size > dcg->tcm_cache_size) {
636
 
                XTThreadPtr self;
637
 
                time_t          now;
638
 
 
639
 
                /* Wait for the cache level to go down.
640
 
                 * If this happens, then the freeer is not working fast
641
 
                 * enough!
642
 
                 */
643
 
 
644
 
                /* But before I do this, I must flush my own log because:
645
 
                 * - The freeer might be waiting for a page to be cleaned.
646
 
                 * - The page can only be cleaned once it has been written to
647
 
                 *   the database.
648
 
                 * - The writer cannot write the page data until it has been
649
 
                 *   flushed to the log.
650
 
                 * - The log won't be flushed, unless this thread does it.
651
 
                 * So there could be a deadlock if I don't flush the log!
652
 
                 */
653
 
                if ((self = xt_get_self())) {
654
 
                        if (!xt_xlog_flush_log(tci_table->tab_db, self))
655
 
                                goto failed;
656
 
                }
657
 
 
658
 
                /* Wait for the free'er thread: */
659
 
                xt_lock_mutex_ns(&dcg->tcm_freeer_lock);
660
 
                now = time(NULL);
661
 
                do {
662
 
                        /* I have set the timeout to 2 here because of the following situation:
663
 
                         * 1. Transaction allocates an op seq
664
 
                         * 2. Transaction goes to update cache, but must wait for
665
 
                         *    cache to be freed (after this, the op would be written to
666
 
                         *    the log).
667
 
                         * 3. The free'er wants to free cache, but is waiting for the writter.
668
 
                         * 4. The writer cannot continue because an op seq is missing!
669
 
                         *    So the writer is waiting for the transaction thread to write
670
 
                         *    the op seq.
671
 
                         * - So we have a deadlock situation.
672
 
                         * - However, this situation can only occur if there is not enougn
673
 
                         *   cache.
674
 
                         * The timeout helps, but will not solve the problem, unless we
675
 
                         * ignore cache level here, after a while, and just continue.
676
 
                         */
677
 
 
678
 
                        /* Wake freeer before we go to sleep: */
679
 
                        if (!dcg->tcm_freeer_busy) {
680
 
                                if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
681
 
                                        xt_log_and_clear_exception_ns();
682
 
                        }
683
 
 
684
 
                        dcg->tcm_threads_waiting++;
685
 
#ifdef DEBUG
686
 
                        if (!xt_timed_wait_cond_ns(&dcg->tcm_freeer_cond, &dcg->tcm_freeer_lock, 30000)) {
687
 
                                dcg->tcm_threads_waiting--;
688
 
                                break;
689
 
                        }
690
 
#else
691
 
                        if (!xt_timed_wait_cond_ns(&dcg->tcm_freeer_cond, &dcg->tcm_freeer_lock, 1000)) {
692
 
                                dcg->tcm_threads_waiting--;
693
 
                                break;
694
 
                        }
695
 
#endif
696
 
                        dcg->tcm_threads_waiting--;
697
 
 
698
 
                        cache_used = 0;
699
 
                        for (int i=0; i<XT_TC_SEGMENT_COUNT; i++)
700
 
                                cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
701
 
 
702
 
                        if (cache_used + page_size <= dcg->tcm_high_level)
703
 
                                break;
704
 
                        /*
705
 
                         * If there is too little cache we can get stuck here.
706
 
                         * The problem is that seg numbers are allocated before fetching a
707
 
                         * record to be updated.
708
 
                         *
709
 
                         * It can happen that we end up waiting for that seq number
710
 
                         * to be written to the log before we can continue here.
711
 
                         *
712
 
                         * This happens as follows:
713
 
                         * 1. This thread waits for the freeer.
714
 
                         * 2. The freeer cannot free a page because it has not been
715
 
                         *    written by the writter.
716
 
                         * 3. The writter cannot continue because it is waiting
717
 
                         *    for a missing sequence number.
718
 
                         * 4. The missing sequence number is the one allocated
719
 
                         *    before we entered this function!
720
 
                         * 
721
 
                         * So don't wait for more than 5 seconds here!
722
 
                         */
723
 
                }
724
 
                while (time(NULL) < now + 5);
725
 
                xt_unlock_mutex_ns(&dcg->tcm_freeer_lock);
726
 
        }
727
 
        else if (cache_used + page_size > dcg->tcm_high_level) {
728
 
                /* Wake up the freeer because the cache level,
729
 
                 * is higher than the high level.
730
 
                 */
731
 
                if (!dcg->tcm_freeer_busy) {
732
 
                        xt_lock_mutex_ns(&xt_tab_cache.tcm_freeer_lock);
733
 
                        if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond))
734
 
                                xt_log_and_clear_exception_ns();
735
 
                        xt_unlock_mutex_ns(&xt_tab_cache.tcm_freeer_lock);
736
 
                }
737
 
        }
738
 
 
739
 
        /* Read the page into memory.... */
740
 
        new_page->tcp_dirty = FALSE;
741
 
        new_page->tcp_seg = (xtWord1) ((page_idx + (file->fr_id * 223)) & XT_TC_SEGMENT_MASK);
742
 
#ifdef XT_CLUSTER_FREE_RECORDS
743
 
        new_page->tcp_free_rec = 0xFFFF;
744
 
#endif
745
 
        new_page->tcp_lock_count = 0;
746
 
        new_page->tcp_hash_idx = hash_idx;
747
 
        new_page->tcp_page_idx = page_idx;
748
 
        new_page->tcp_file_id = file->fr_id;
749
 
        new_page->tcp_db_id = this->tci_table->tab_db->db_id;
750
 
        new_page->tcp_tab_id = this->tci_table->tab_id;
751
 
        new_page->tcp_data_size = this->tci_page_size;
752
 
        new_page->tcp_op_seq = 0; // Value not used because not dirty
753
 
 
754
 
        if (read) {
755
 
                if (!XT_PREAD_RR_FILE(file, address, this->tci_page_size, 0, new_page->tcp_data, &red_size, &thread->st_statistics.st_rec, thread))
756
 
                        goto failed;
757
 
 
758
 
#ifdef XT_CLUSTER_FREE_RECORDS
759
 
                /* Find the first free record! */
760
 
                if (tci_rec_file) {
761
 
                        xtWord1 *buff_ptr;
762
 
                        xtWord1 *end_ptr;
763
 
 
764
 
                        buff_ptr = new_page->tcp_data;
765
 
                        end_ptr = new_page->tcp_data + red_size;
766
 
                        while (buff_ptr < end_ptr) {
767
 
                                if (XT_REC_IS_FREE(*buff_ptr)) {
768
 
                                        new_page->tcp_free_rec = (xtWord2) (buff_ptr - new_page->tcp_data);
769
 
                                        break;
770
 
                                }
771
 
                                buff_ptr += tci_rec_size;
772
 
                        }
773
 
                }
774
 
#endif
775
 
        }
776
 
#ifdef XT_MEMSET_UNUSED_SPACE
777
 
        else
778
 
                red_size = 0;
779
 
 
780
 
        /* Removing this is an optimization. It should not be required
781
 
         * to clear the unused space in the page.
782
 
         */
783
 
        memset(new_page->tcp_data + red_size, 0, this->tci_page_size - red_size);
784
 
#endif
785
 
 
786
 
        /* Add the page to the cache! */
787
 
        TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id);
788
 
#ifdef CHECK_DOUBLE_READ
789
 
        seg->tcs_total_reads++;
790
 
#endif
791
 
        page = seg->tcs_hash_table[hash_idx];
792
 
        while (page) {
793
 
                if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
794
 
                        /* Oops, someone else was faster! */
795
 
#ifdef CHECK_DOUBLE_READ
796
 
                        seg->tcs_read_not_req++;
797
 
#endif
798
 
                        xt_free_ns(new_page);
799
 
                        goto done_ok;
800
 
                }
801
 
                page = page->tcp_next;
802
 
        }
803
 
        page = new_page;
804
 
 
805
 
        /* Make the page the most recently used: */
806
 
        xt_lock_mutex_ns(&dcg->tcm_lock);
807
 
        page->tcp_ru_time = ++dcg->tcm_ru_now;
808
 
        if ((page->tcp_lr_used = dcg->tcm_mru_page))
809
 
                dcg->tcm_mru_page->tcp_mr_used = page;
810
 
        page->tcp_mr_used = NULL;
811
 
        dcg->tcm_mru_page = page;
812
 
        if (!dcg->tcm_lru_page)
813
 
                dcg->tcm_lru_page = page;
814
 
        xt_unlock_mutex_ns(&dcg->tcm_lock);
815
 
 
816
 
        /* Add the page to the hash table: */
817
 
        page->tcp_next = seg->tcs_hash_table[hash_idx];
818
 
        seg->tcs_hash_table[hash_idx] = page;
819
 
 
820
 
        /* GOTCHA! This increment was done just after the malloc!
821
 
         * So it was not protected by the segment lock!
822
 
         * The result was that this count was no longer reliable,
823
 
         * This resulted in the amount of cache being used becoming less, and\
824
 
         * less, because increments were lost over time!
825
 
         */
826
 
        /* Increment cache used. */
827
 
        seg->tcs_cache_in_use += page_size;
828
 
 
829
 
        done_ok:
830
 
        *ret_seg = seg;
831
 
        *ret_page = page;
832
 
#ifdef DEBUG_CHECK_CACHE
833
 
        //XT_TC_check_cache();
834
 
#endif
835
 
        thread->st_statistics.st_rec_cache_miss++;
836
 
        return OK;
837
 
 
838
 
        failed:
839
 
        xt_free_ns(new_page);
840
 
        return FAILED;
841
 
}
842
 
 
843
 
 
844
 
/* ----------------------------------------------------------------------
845
 
 * OPERATION SEQUENCE
846
 
 */
847
 
 
848
 
xtBool XTTableSeq::ts_log_no_op(XTThreadPtr thread, xtTableID tab_id, xtOpSeqNo op_seq)
849
 
{
850
 
        XTactNoOpEntryDRec      ent_rec;
851
 
        xtWord4                         sum = (xtWord4) tab_id ^ (xtWord4) op_seq;
852
 
 
853
 
        ent_rec.no_status_1 = XT_LOG_ENT_NO_OP;
854
 
        ent_rec.no_checksum_1 = XT_CHECKSUM_1(sum);
855
 
        XT_SET_DISK_4(ent_rec.no_tab_id_4, tab_id);
856
 
        XT_SET_DISK_4(ent_rec.no_op_seq_4, op_seq);
857
 
        /* TODO - If this also fails we have a problem.
858
 
         * From this point on we should actually not generate
859
 
         * any more op IDs. The problem is that the
860
 
         * some will be missing, so the writer will not
861
 
         * be able to contniue.
862
 
         */
863
 
        return xt_xlog_log_data(thread, sizeof(XTactNoOpEntryDRec), (XTXactLogBufferDPtr) &ent_rec, XT_XLOG_NO_WRITE_NO_FLUSH);
864
 
}
865
 
 
866
 
#ifdef XT_NOT_INLINE
867
 
xtOpSeqNo XTTableSeq::ts_set_op_seq(XTTabCachePagePtr page)
868
 
{
869
 
        xtOpSeqNo seq;
870
 
 
871
 
        xt_lock_mutex_ns(&ts_ns_lock);
872
 
        page->tcp_op_seq = seq = ts_next_seq++;
873
 
        xt_unlock_mutex_ns(&ts_ns_lock);
874
 
        return seq;
875
 
}
876
 
 
877
 
xtOpSeqNo XTTableSeq::ts_get_op_seq()
878
 
{
879
 
        xtOpSeqNo seq;
880
 
 
881
 
        xt_lock_mutex_ns(&ts_ns_lock);
882
 
        seq = ts_next_seq++;
883
 
        xt_unlock_mutex_ns(&ts_ns_lock);
884
 
        return seq;
885
 
}
886
 
#endif
887
 
 
888
 
#ifdef XT_NOT_INLINE
889
 
/*
890
 
 * Return TRUE if the current sequence is before the
891
 
 * target (then) sequence number. This function
892
 
 * takes into account overflow. Overflow is detected
893
 
 * by checking the difference between the 2 values.
894
 
 * If the difference is very large, then we
895
 
 * assume overflow.
896
 
 */
897
 
xtBool XTTableSeq::xt_op_is_before(register xtOpSeqNo now, register xtOpSeqNo then)
898
 
{
899
 
        ASSERT_NS(sizeof(xtOpSeqNo) == 4);
900
 
        /* The now time is being incremented.
901
 
         * If it is after the then time (which is static, then
902
 
         * it is not before!
903
 
         */
904
 
        if (now >= then) {
905
 
                if ((now - then) > (xtOpSeqNo) 0xFFFFFFFF/2)
906
 
                        return TRUE;
907
 
                return FALSE;
908
 
        }
909
 
 
910
 
        /* If it appears to be before, we still have to check
911
 
         * for overflow. If the gap is bigger then half of
912
 
         * the MAX value, then we can assume it has wrapped around
913
 
         * because we know that no then can be so far in the
914
 
         * future!
915
 
         */
916
 
        if ((then - now) > (xtOpSeqNo) 0xFFFFFFFF/2)
917
 
                return FALSE;
918
 
        return TRUE;
919
 
}
920
 
#endif
921
 
 
922
 
 
923
 
/* ----------------------------------------------------------------------
924
 
 * F R E E E R    P R O C E S S
925
 
 */
926
 
 
927
 
/*
928
 
 * Used by the writer to wake the freeer.
929
 
 */
930
 
xtPublic void xt_wr_wake_freeer(XTThreadPtr XT_UNUSED(self), XTDatabaseHPtr db)
931
 
{
932
 
        /* BUG FIX: Was using tcm_freeer_cond.
933
 
         * This is incorrect. When the freeer waits for the
934
 
         * writter, it uses the writer's condition!
935
 
         */
936
 
        xt_lock_mutex_ns(&db->db_wr_lock);
937
 
        if (!xt_broadcast_cond_ns(&db->db_wr_cond))
938
 
                xt_log_and_clear_exception_ns();
939
 
        xt_unlock_mutex_ns(&db->db_wr_lock);
940
 
/*
941
 
        xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
942
 
        pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
943
 
        if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond))
944
 
                xt_log_and_clear_exception_ns();
945
 
        freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
946
 
*/
947
 
}
948
 
 
949
 
/* Wait for a transaction to quit: */
950
 
static void tabc_fr_wait_for_cache(XTThreadPtr self, u_int msecs)
951
 
{
952
 
        if (!self->t_quit)
953
 
                xt_timed_wait_cond(NULL, &xt_tab_cache.tcm_freeer_cond, &xt_tab_cache.tcm_freeer_lock, msecs);
954
 
}
955
 
 
956
 
typedef struct TCResource {
957
 
        XTOpenTablePtr          tc_ot;
958
 
} TCResourceRec, *TCResourcePtr;
959
 
 
960
 
static void tabc_free_fr_resources(XTThreadPtr self, TCResourcePtr tc)
961
 
{
962
 
        if (tc->tc_ot) {
963
 
                xt_db_return_table_to_pool(self, tc->tc_ot);
964
 
                tc->tc_ot = NULL;
965
 
        }
966
 
}
967
 
 
968
 
static XTTableHPtr tabc_get_table(XTThreadPtr self, TCResourcePtr tc, xtDatabaseID db_id, xtTableID tab_id)
969
 
{
970
 
        XTTableHPtr     tab;
971
 
        XTDatabaseHPtr  db;
972
 
 
973
 
        if (tc->tc_ot) {
974
 
                tab = tc->tc_ot->ot_table;
975
 
                if (tab->tab_id == tab_id && tab->tab_db->db_id == db_id)
976
 
                        return tab;
977
 
 
978
 
                xt_db_return_table_to_pool(self, tc->tc_ot);
979
 
                tc->tc_ot = NULL;
980
 
        }
981
 
 
982
 
        if (!tc->tc_ot) {
983
 
                if (!(db = xt_get_database_by_id(self, db_id)))
984
 
                        return NULL;
985
 
 
986
 
                pushr_(xt_heap_release, db);
987
 
                tc->tc_ot = xt_db_open_pool_table(self, db, tab_id, NULL, TRUE);
988
 
                freer_(); // xt_heap_release(db);
989
 
                if (!tc->tc_ot)
990
 
                        return NULL;
991
 
        }
992
 
 
993
 
        return tc->tc_ot->ot_table;
994
 
}
995
 
 
996
 
/*
997
 
 * Free the given page, or the least recently used page.
998
 
 * Return the amount of bytes freed.
999
 
 */
1000
 
static size_t tabc_free_page(XTThreadPtr self, TCResourcePtr tc)
1001
 
{
1002
 
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
1003
 
        XTTableHPtr                                     tab = NULL;
1004
 
        XTTabCachePagePtr                       page, lpage, ppage;
1005
 
        XTTabCacheSegPtr                        seg;
1006
 
        u_int                                           page_cnt;
1007
 
        xtBool                                          was_dirty;
1008
 
 
1009
 
#ifdef DEBUG_CHECK_CACHE
1010
 
        //XT_TC_check_cache();
1011
 
#endif
1012
 
        dcg->tcm_free_try_count = 0;
1013
 
 
1014
 
        retry:
1015
 
        /* Note, handling the page is safe because
1016
 
         * there is only one free'er thread which
1017
 
         * can remove pages from the cache!
1018
 
         */
1019
 
        page_cnt = 0;
1020
 
        if (!(page = dcg->tcm_lru_page)) {
1021
 
                dcg->tcm_free_try_count = 0;
1022
 
                return 0;
1023
 
        }
1024
 
 
1025
 
        retry_2:
1026
 
        if ((was_dirty = page->tcp_dirty)) {
1027
 
                /* Do all this stuff without a lock, because to
1028
 
                 * have a lock while doing this is too expensive!
1029
 
                 */
1030
 
        
1031
 
                /* Wait for the page to be cleaned. */
1032
 
                tab = tabc_get_table(self, tc, page->tcp_db_id, page->tcp_tab_id);
1033
 
        }
1034
 
 
1035
 
        seg = &dcg->tcm_segment[page->tcp_seg];
1036
 
        TAB_CAC_WRITE_LOCK(&seg->tcs_lock, self->t_id);
1037
 
 
1038
 
        if (page->tcp_dirty) {
1039
 
                if (!was_dirty) {
1040
 
                        TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1041
 
                        goto retry_2;
1042
 
                }
1043
 
 
1044
 
                if (tab) {
1045
 
                        ASSERT(!XTTableSeq::xt_op_is_before(tab->tab_seq.ts_next_seq, page->tcp_op_seq+1));
1046
 
                        /* This should never happen. However, is has been occuring,
1047
 
                         * during multi_update test on Windows.
1048
 
                         * In particular it occurs after rename of a table, during ALTER.
1049
 
                         * As if the table was not flushed before the rename!?
1050
 
                         * To guard against an infinite loop below, I will just continue here.
1051
 
                         */
1052
 
                        if (XTTableSeq::xt_op_is_before(tab->tab_seq.ts_next_seq, page->tcp_op_seq+1))
1053
 
                                goto go_on;
1054
 
                        /* OK, we have the table, now we check where the current
1055
 
                         * sequence number is.
1056
 
                         */
1057
 
                        if (XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, page->tcp_op_seq)) {
1058
 
                                XTDatabaseHPtr db = tab->tab_db;
1059
 
 
1060
 
                                rewait:
1061
 
                                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1062
 
 
1063
 
                                /* Flush the log, in case this is holding up the
1064
 
                                 * writer!
1065
 
                                 */
1066
 
                                if (!db->db_xlog.xlog_flush(self)) {
1067
 
                                        dcg->tcm_free_try_count = 0;
1068
 
                                        xt_throw(self);
1069
 
                                }
1070
 
 
1071
 
                                xt_lock_mutex(self, &db->db_wr_lock);
1072
 
                                pushr_(xt_unlock_mutex, &db->db_wr_lock);
1073
 
 
1074
 
                                /* The freeer is now waiting: */
1075
 
                                db->db_wr_freeer_waiting = TRUE;
1076
 
 
1077
 
                                /* If the writer is idle, wake it up. 
1078
 
                                 * The writer will commit the changes to the database
1079
 
                                 * which will allow the freeer to free up the cache.
1080
 
                                 */
1081
 
                                if (db->db_wr_idle) {
1082
 
                                        if (!xt_broadcast_cond_ns(&db->db_wr_cond))
1083
 
                                                xt_log_and_clear_exception_ns();
1084
 
                                }
1085
 
 
1086
 
                                /* Go to sleep on the writer's condition.
1087
 
                                 * The writer will wake the free'er before it goes to
1088
 
                                 * sleep!
1089
 
                                 */
1090
 
                                tab->tab_wake_freeer_op = page->tcp_op_seq;
1091
 
                                tab->tab_wr_wake_freeer = TRUE;
1092
 
                                if (!xt_timed_wait_cond_ns(&db->db_wr_cond, &db->db_wr_lock, 30000)) {
1093
 
                                        tab->tab_wr_wake_freeer = FALSE;
1094
 
                                        db->db_wr_freeer_waiting = FALSE;
1095
 
                                        xt_throw(self);
1096
 
                                }
1097
 
                                tab->tab_wr_wake_freeer = FALSE;
1098
 
                                db->db_wr_freeer_waiting = FALSE;
1099
 
                                freer_(); // xt_unlock_mutex(&db->db_wr_lock)
1100
 
 
1101
 
                                TAB_CAC_WRITE_LOCK(&seg->tcs_lock, self->t_id);
1102
 
                                if (XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, page->tcp_op_seq))
1103
 
                                        goto rewait;
1104
 
                        }
1105
 
                        go_on:;
1106
 
                }
1107
 
        }
1108
 
 
1109
 
        /* Wait if the page is being read or locked. */
1110
 
        if (page->tcp_lock_count) {
1111
 
                /* (1) If the page is being read, then we should not free
1112
 
                 *     it immediately.
1113
 
                 * (2) If a page is locked, the locker may be waiting
1114
 
                 *     for the freeer to free some cache - this
1115
 
                 *     causes a deadlock.
1116
 
                 *
1117
 
                 * Therefore, we move on, and try to free another page...
1118
 
                 */
1119
 
                if (page_cnt < (dcg->tcm_approx_page_count >> 1)) {
1120
 
                        /* Page has not changed MRU position, and we
1121
 
                         * have looked at less than half of the pages.
1122
 
                         * Go to the next page...
1123
 
                         */
1124
 
                        if ((page = page->tcp_mr_used)) {
1125
 
                                page_cnt++;
1126
 
                                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1127
 
                                goto retry_2;
1128
 
                        }
1129
 
                }
1130
 
                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1131
 
                dcg->tcm_free_try_count++;                              
1132
 
 
1133
 
                /* Starting to spin, free the threads: */
1134
 
                if (dcg->tcm_threads_waiting) {
1135
 
                        if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
1136
 
                                xt_log_and_clear_exception_ns();
1137
 
                }
1138
 
                goto retry;
1139
 
        }
1140
 
 
1141
 
        /* Page is clean, remove from the hash table: */
1142
 
 
1143
 
        /* Find the page on the list: */
1144
 
        u_int page_idx = page->tcp_page_idx;
1145
 
        u_int file_id = page->tcp_file_id;
1146
 
 
1147
 
        ppage = NULL;
1148
 
        lpage = seg->tcs_hash_table[page->tcp_hash_idx];
1149
 
        while (lpage) {
1150
 
                if (lpage->tcp_page_idx == page_idx && lpage->tcp_file_id == file_id)
1151
 
                        break;
1152
 
                ppage = lpage;
1153
 
                lpage = lpage->tcp_next;
1154
 
        }
1155
 
 
1156
 
        if (page == lpage) {
1157
 
                /* Should be the case! */
1158
 
                if (ppage)
1159
 
                        ppage->tcp_next = page->tcp_next;
1160
 
                else
1161
 
                        seg->tcs_hash_table[page->tcp_hash_idx] = page->tcp_next;
1162
 
        }
1163
 
#ifdef DEBUG
1164
 
        else
1165
 
                ASSERT_NS(FALSE);
1166
 
#endif
1167
 
 
1168
 
        /* Remove from the MRU list: */
1169
 
        xt_lock_mutex_ns(&dcg->tcm_lock);
1170
 
        if (dcg->tcm_lru_page == page)
1171
 
                dcg->tcm_lru_page = page->tcp_mr_used;
1172
 
        if (dcg->tcm_mru_page == page)
1173
 
                dcg->tcm_mru_page = page->tcp_lr_used;
1174
 
        if (page->tcp_lr_used)
1175
 
                page->tcp_lr_used->tcp_mr_used = page->tcp_mr_used;
1176
 
        if (page->tcp_mr_used)
1177
 
                page->tcp_mr_used->tcp_lr_used = page->tcp_lr_used;
1178
 
        xt_unlock_mutex_ns(&dcg->tcm_lock);
1179
 
 
1180
 
        /* Free the page: */
1181
 
        size_t freed_space = offsetof(XTTabCachePageRec, tcp_data) + page->tcp_data_size;
1182
 
        ASSERT_NS(seg->tcs_cache_in_use >= freed_space);
1183
 
        seg->tcs_cache_in_use -= freed_space;
1184
 
        ASSERT_NS(seg->tcs_cache_in_use == 0 || seg->tcs_cache_in_use >= 25000);
1185
 
        xt_free_ns(page);
1186
 
 
1187
 
        TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1188
 
        self->st_statistics.st_rec_cache_frees++;
1189
 
        dcg->tcm_free_try_count = 0;
1190
 
        return freed_space;
1191
 
}
1192
 
 
1193
 
 
1194
 
#define CACHE_HIGH_LEVEL(d)                     
1195
 
 
1196
 
static void tabc_fr_main(XTThreadPtr self)
1197
 
{
1198
 
        register XTTabCacheMemPtr       dcg = &xt_tab_cache;
1199
 
        TCResourceRec                           tc = { 0 };
1200
 
        int                                                     i;
1201
 
 
1202
 
        xt_set_low_priority(self);
1203
 
        dcg->tcm_freeer_busy = TRUE;
1204
 
 
1205
 
        while (!self->t_quit) {         
1206
 
                size_t cache_used, freed;
1207
 
 
1208
 
                pushr_(tabc_free_fr_resources, &tc);
1209
 
 
1210
 
                while (!self->t_quit) {
1211
 
                        /* Total up the cache memory used: */
1212
 
                        cache_used = 0;
1213
 
                        for (i=0; i<XT_TC_SEGMENT_COUNT; i++)
1214
 
                                cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
1215
 
 
1216
 
                        if (cache_used > dcg->tcm_cache_high)
1217
 
                                dcg->tcm_cache_high = cache_used;
1218
 
 
1219
 
                        /* Check if the cache usage is over 95%: */
1220
 
                        if (self->t_quit)
1221
 
                                break;
1222
 
 
1223
 
                        /* If threads are waiting then we are more aggressive about freeing
1224
 
                         * cache.
1225
 
                         */ 
1226
 
                        if (cache_used < (dcg->tcm_threads_waiting ? dcg->tcm_mid_level : dcg->tcm_high_level))
1227
 
                                break;
1228
 
 
1229
 
                        /* Reduce cache to the 75% level: */
1230
 
                        while (!self->t_quit && cache_used > dcg->tcm_low_level) {
1231
 
                                freed = tabc_free_page(self, &tc);
1232
 
                                cache_used -= freed;
1233
 
                                if (cache_used <= dcg->tcm_high_level) {
1234
 
                                        /* Wakeup any threads that are waiting for some cache to be
1235
 
                                         * freed.
1236
 
                                         */
1237
 
                                        if (dcg->tcm_threads_waiting) {
1238
 
                                                if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
1239
 
                                                        xt_log_and_clear_exception_ns();
1240
 
                                        }
1241
 
                                }
1242
 
                        }
1243
 
                }
1244
 
 
1245
 
                freer_(); // tabc_free_fr_resources(&tc)
1246
 
 
1247
 
                xt_lock_mutex(self, &dcg->tcm_freeer_lock);
1248
 
                pushr_(xt_unlock_mutex, &dcg->tcm_freeer_lock);
1249
 
 
1250
 
                if (dcg->tcm_threads_waiting) {
1251
 
                        /* Wake threads before we go to sleep: */
1252
 
                        if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
1253
 
                                xt_log_and_clear_exception_ns();
1254
 
                }
1255
 
                        
1256
 
                /* Wait for a thread that allocates data to signal
1257
 
                 * that the cache level has exceeeded the upper limit:
1258
 
                 */
1259
 
                xt_db_approximate_time = time(NULL);
1260
 
                dcg->tcm_freeer_busy = FALSE;
1261
 
                /* No idea, why, but I am getting an uneccesarry pause here.
1262
 
                 * I run DBT2 with low record cache.
1263
 
                 *
1264
 
                 * Every now and then there is a pause where the freeer is here,
1265
 
                 * and all user threads are waiting for the freeer.
1266
 
                 *
1267
 
                 * So adding the tcm_threads_waiting condition.
1268
 
                 */
1269
 
                if (dcg->tcm_threads_waiting) {
1270
 
                        cache_used = 0;
1271
 
                        for (i=0; i<XT_TC_SEGMENT_COUNT; i++)
1272
 
                                cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
1273
 
                        if (cache_used < dcg->tcm_mid_level)
1274
 
                                tabc_fr_wait_for_cache(self, 500);
1275
 
                }
1276
 
                else
1277
 
                        tabc_fr_wait_for_cache(self, 500);
1278
 
                //tabc_fr_wait_for_cache(self, 30*1000);
1279
 
                dcg->tcm_freeer_busy = TRUE;
1280
 
                xt_db_approximate_time = time(NULL);
1281
 
                freer_(); // xt_unlock_mutex(&dcg->tcm_freeer_lock)
1282
 
        }
1283
 
}
1284
 
 
1285
 
static void *tabc_fr_run_thread(XTThreadPtr self)
1286
 
{
1287
 
        int             count;
1288
 
        void    *mysql_thread;
1289
 
 
1290
 
        myxt_wait_pbxt_plugin_slot_assigned(self);
1291
 
 
1292
 
        mysql_thread = myxt_create_thread();
1293
 
 
1294
 
        while (!self->t_quit) {
1295
 
                try_(a) {
1296
 
                        tabc_fr_main(self);
1297
 
                }
1298
 
                catch_(a) {
1299
 
                        /* This error is "normal"! */
1300
 
                        if (!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
1301
 
                                self->t_exception.e_sys_err == SIGTERM))
1302
 
                                xt_log_and_clear_exception(self);
1303
 
                }
1304
 
                cont_(a);
1305
 
 
1306
 
                /* After an exception, pause before trying again... */
1307
 
                /* Number of seconds */
1308
 
#ifdef DEBUG
1309
 
                count = 10;
1310
 
#else
1311
 
                count = 2*60;
1312
 
#endif
1313
 
                while (!self->t_quit && count > 0) {
1314
 
                        xt_db_approximate_time = time(NULL);
1315
 
                        sleep(1);
1316
 
                        count--;
1317
 
                }
1318
 
        }
1319
 
 
1320
 
   /*
1321
 
        * {MYSQL-THREAD-KILL}
1322
 
        myxt_destroy_thread(mysql_thread, TRUE);
1323
 
        */
1324
 
        return NULL;
1325
 
}
1326
 
 
1327
 
static void tabc_fr_free_thread(XTThreadPtr self, void *XT_UNUSED(data))
1328
 
{
1329
 
        if (xt_tab_cache.tcm_freeer_thread) {
1330
 
                xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
1331
 
                pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
1332
 
                xt_tab_cache.tcm_freeer_thread = NULL;
1333
 
                freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
1334
 
        }
1335
 
}
1336
 
 
1337
 
xtPublic void xt_start_freeer(XTThreadPtr self)
1338
 
{
1339
 
        xt_tab_cache.tcm_freeer_thread = xt_create_daemon(self, "free-er");
1340
 
        xt_set_thread_data(xt_tab_cache.tcm_freeer_thread, NULL, tabc_fr_free_thread);
1341
 
        xt_run_thread(self, xt_tab_cache.tcm_freeer_thread, tabc_fr_run_thread);
1342
 
}
1343
 
 
1344
 
xtPublic void xt_quit_freeer(XTThreadPtr self)
1345
 
{
1346
 
        if (xt_tab_cache.tcm_freeer_thread) {
1347
 
                xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
1348
 
                pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
1349
 
                xt_terminate_thread(self, xt_tab_cache.tcm_freeer_thread);
1350
 
                freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
1351
 
        }
1352
 
}
1353
 
 
1354
 
xtPublic void xt_stop_freeer(XTThreadPtr self)
1355
 
{
1356
 
        XTThreadPtr thr_fr;
1357
 
 
1358
 
        if (xt_tab_cache.tcm_freeer_thread) {
1359
 
                xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
1360
 
                pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
1361
 
 
1362
 
                /* This pointer is safe as long as you have the transaction lock. */
1363
 
                if ((thr_fr = xt_tab_cache.tcm_freeer_thread)) {
1364
 
                        xtThreadID tid = thr_fr->t_id;
1365
 
 
1366
 
                        /* Make sure the thread quits when woken up. */
1367
 
                        xt_terminate_thread(self, thr_fr);
1368
 
 
1369
 
                        /* Wake the freeer to get it to quit: */
1370
 
                        if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond))
1371
 
                                xt_log_and_clear_exception_ns();
1372
 
        
1373
 
                        freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
1374
 
 
1375
 
                        /*
1376
 
                         * GOTCHA: This is a wierd thing but the SIGTERM directed
1377
 
                         * at a particular thread (in this case the sweeper) was
1378
 
                         * being caught by a different thread and killing the server
1379
 
                         * sometimes. Disconcerting.
1380
 
                         * (this may only be a problem on Mac OS X)
1381
 
                        xt_kill_thread(thread);
1382
 
                         */
1383
 
                        xt_wait_for_thread_to_exit(tid, FALSE);
1384
 
        
1385
 
                        /* PMC - This should not be necessary to set the signal here, but in the
1386
 
                         * debugger the handler is not called!!?
1387
 
                        thr_fr->t_delayed_signal = SIGTERM;
1388
 
                        xt_kill_thread(thread);
1389
 
                         */
1390
 
                        xt_tab_cache.tcm_freeer_thread = NULL;
1391
 
                }
1392
 
                else
1393
 
                        freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
1394
 
        }
1395
 
}
1396
 
 
1397
 
xtPublic void xt_load_pages(XTThreadPtr self, XTOpenTablePtr ot)
1398
 
{
1399
 
        XTTableHPtr                     tab = ot->ot_table;
1400
 
        xtRecordID                      rec_id;
1401
 
        XTTabCachePagePtr       page;
1402
 
        XTTabCacheSegPtr        seg;
1403
 
        size_t                          poffset;
1404
 
 
1405
 
        rec_id = 1;
1406
 
        while (rec_id<tab->tab_row_eof_id) {
1407
 
                if (!tab->tab_rows.tc_fetch(ot->ot_row_file, rec_id, &seg, &page, &poffset, TRUE, self))
1408
 
                        xt_throw(self);
1409
 
                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1410
 
                rec_id += tab->tab_rows.tci_rows_per_page;
1411
 
        }
1412
 
 
1413
 
        rec_id = 1;
1414
 
        while (rec_id<tab->tab_rec_eof_id) {
1415
 
                if (!tab->tab_recs.tc_fetch(ot->ot_rec_file, rec_id, &seg, &page, &poffset, TRUE, self))
1416
 
                        xt_throw(self);
1417
 
                TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1418
 
                rec_id += tab->tab_recs.tci_rows_per_page;
1419
 
        }
1420
 
}
1421
 
 
1422
 
#ifdef TRACE_DISTRIBUTION
1423
 
/* ----------------------------------------------------------------------
1424
 
 *
1425
 
 */
1426
 
 
1427
 
//#define TABC_TRACE_FILE       "stock"
1428
 
//#define TABC_TRACE_FILE               "customer"
1429
 
#define TABC_TRACE_FILE         "upd_ind_PBXT"
1430
 
//#define TABC_TRACE_FILE       "order_line"
1431
 
 
1432
 
#define TABC_DISPLAY_WIDTH      180
1433
 
 
1434
 
//#define TRACK_ROWS
1435
 
 
1436
 
#ifdef TRACK_ROWS
1437
 
#define TABC_CORRECT_REC_SIZE(s)                ((s) == 4)
1438
 
#else
1439
 
#define TABC_CORRECT_REC_SIZE(s)                ((s) != 4)
1440
 
#endif
1441
 
 
1442
 
static XTSpinXSLockRec  tabc_dist_lock;
1443
 
static u_int                    tabc_dist_file_id;
1444
 
static xtWord8                  tabc_output_time;
1445
 
static int                              tabc_recs_per_block;
1446
 
static int                              tabc_change_map_eof;
1447
 
static int                              tabc_change_map_init_eof;
1448
 
static int                              tabc_change_map_size;
1449
 
static int                              *tabc_change_map;
1450
 
static char                             *tabc_change_map_string;
1451
 
 
1452
 
static void tabc_init_dist(XTTabCachePtr cac, XTFilePtr file)
1453
 
{
1454
 
        if (!strstr(xt_last_name_of_path(file->fil_path), TABC_TRACE_FILE) || !TABC_CORRECT_REC_SIZE(cac->tci_rec_size))
1455
 
                return;
1456
 
 
1457
 
        xt_spinxslock_init_with_autoname(NULL, &tabc_dist_lock);
1458
 
        tabc_dist_file_id = file->fil_id;
1459
 
        tabc_output_time = xt_trace_clock();
1460
 
#ifdef TRACK_ROWS
1461
 
        tabc_recs_per_block = cac->tci_table->tab_row_eof_id / TABC_DISPLAY_WIDTH;
1462
 
#else
1463
 
        tabc_recs_per_block = cac->tci_table->tab_row_eof_id / TABC_DISPLAY_WIDTH;
1464
 
#endif
1465
 
        tabc_change_map_init_eof = TABC_DISPLAY_WIDTH;
1466
 
        tabc_change_map_eof = tabc_change_map_init_eof;
1467
 
        tabc_change_map_size = tabc_change_map_eof * 2;
1468
 
        tabc_change_map = (int *) malloc(tabc_change_map_size * sizeof(int));
1469
 
        memset(tabc_change_map, 0, tabc_change_map_size * sizeof(int));
1470
 
        tabc_change_map_string = (char *) malloc(tabc_change_map_size + 1);
1471
 
        printf("FILE: %s\n", file->fil_path);
1472
 
        printf("rec size:   %d\n", (int) cac->tci_rec_size);
1473
 
        printf("block size: %d\n", tabc_recs_per_block);
1474
 
        printf("EOF:        %d\n", tabc_change_map_eof);
1475
 
}
1476
 
 
1477
 
/* _.-~`'* */
1478
 
static void tabc_dist_output(xtWord8 now)
1479
 
{
1480
 
        int v;
1481
 
        int i;  
1482
 
        int time;
1483
 
 
1484
 
        time = (int) (now - tabc_output_time);
1485
 
        tabc_output_time = now;
1486
 
        xt_spinxslock_xlock(&tabc_dist_lock, FALSE, 0);
1487
 
        for (i=0; i<tabc_change_map_eof; i++) {
1488
 
                v = tabc_change_map[i];
1489
 
                tabc_change_map[i] = 0;
1490
 
                if (v == 0)
1491
 
                        tabc_change_map_string[i] = ' ';
1492
 
                else if (v <= 2)
1493
 
                        tabc_change_map_string[i] = '_';
1494
 
                else if (v <= 10)
1495
 
                        tabc_change_map_string[i] = '.';
1496
 
                else if (v <= 50)
1497
 
                        tabc_change_map_string[i] = '-';
1498
 
                else if (v <= 100)
1499
 
                        tabc_change_map_string[i] = '~';
1500
 
                else if (v <= 200)
1501
 
                        tabc_change_map_string[i] = '`';
1502
 
                else if (v <= 500)
1503
 
                        tabc_change_map_string[i] = '\'';
1504
 
                else
1505
 
                        tabc_change_map_string[i] = '*';
1506
 
        }
1507
 
        tabc_change_map_string[i] = 0;
1508
 
        printf("%5d %s\n", time / 1000, tabc_change_map_string);
1509
 
        xt_spinxslock_unlock(&tabc_dist_lock, TRUE);
1510
 
}
1511
 
 
1512
 
static void tabc_dist_change(XTOpenFilePtr file, xtRefID ref_id)
1513
 
{
1514
 
        xtWord8 now;
1515
 
 
1516
 
        if (file->fr_file->fil_id != tabc_dist_file_id)
1517
 
                return;
1518
 
 
1519
 
        now = xt_trace_clock();
1520
 
        if (now - tabc_output_time >= 1000000)
1521
 
                tabc_dist_output(now);
1522
 
 
1523
 
        ref_id = ref_id / tabc_recs_per_block;
1524
 
        if (ref_id < tabc_change_map_size)
1525
 
                tabc_change_map[ref_id]++;
1526
 
        if (ref_id+1 > tabc_change_map_eof)
1527
 
                tabc_change_map_eof = ref_id+1;
1528
 
}
1529
 
 
1530
 
#endif
1531
 
 
1532