1
/* Copyright (c) 2007 PrimeBase Technologies GmbH
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* 2007-10-30 Paul McCullagh
23
* The new table cache. Caches all non-index data. This includes the data
24
* files and the row pointer files.
27
#include "xt_config.h"
35
#include "pthread_xt.h"
36
#include "tabcache_xt.h"
38
#include "database_xt.h"
42
#include "strutil_xt.h"
45
//#define TRACE_DISTRIBUTION
48
#ifdef TRACE_DISTRIBUTION
49
static void tabc_init_dist(XTTabCachePtr cac, XTFilePtr file);
50
static void tabc_dist_change(XTOpenFilePtr file, xtRefID ref_id);
53
xtPublic XTTabCacheMemRec xt_tab_cache;
55
static void tabc_fr_wait_for_cache(XTThreadPtr self, u_int msecs);
57
xtPublic void xt_tc_set_cache_size(size_t cache_size)
59
xt_tab_cache.tcm_cache_size = cache_size;
60
/* Multiplying by this number can overflow a 4 byte value! */
61
xt_tab_cache.tcm_low_level = (size_t) ((xtWord8) cache_size * (xtWord8) 70 / (xtWord8) 100); // Current 70%
62
xt_tab_cache.tcm_high_level = (size_t) ((xtWord8) cache_size * 95 / (xtWord8) 100); // Current 95%
63
xt_tab_cache.tcm_mid_level = (size_t) ((xtWord8) cache_size * 85 / (xtWord8) 100); // Current 85%
67
* Initialize the disk cache.
69
xtPublic void xt_tc_init(XTThreadPtr self, size_t cache_size)
71
xt_tc_set_cache_size(cache_size);
73
xt_tab_cache.tcm_approx_page_count = cache_size / sizeof(XTTabCachePageRec);
74
/* Determine the size of the hash table.
75
* The size is set to 2* the number of pages!
77
xt_tab_cache.tcm_hash_size = (xt_tab_cache.tcm_approx_page_count * 2) / XT_TC_SEGMENT_COUNT;
80
for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) {
81
xt_tab_cache.tcm_segment[i].tcs_cache_in_use = 0;
82
xt_tab_cache.tcm_segment[i].tcs_hash_table = (XTTabCachePagePtr *) xt_calloc(self, xt_tab_cache.tcm_hash_size * sizeof(XTTabCachePagePtr));
83
TAB_CAC_INIT_LOCK(self, &xt_tab_cache.tcm_segment[i].tcs_lock);
86
xt_init_mutex_with_autoname(self, &xt_tab_cache.tcm_lock);
87
xt_init_cond(self, &xt_tab_cache.tcm_cond);
88
xt_init_mutex_with_autoname(self, &xt_tab_cache.tcm_freeer_lock);
89
xt_init_cond(self, &xt_tab_cache.tcm_freeer_cond);
98
xtPublic void xt_tc_exit(XTThreadPtr self)
100
XTTabCacheSegPtr seg;
102
for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) {
103
seg = &xt_tab_cache.tcm_segment[i];
104
if (seg->tcs_hash_table) {
105
XTTabCachePagePtr page, tmp_page;
107
for (size_t j=0; j<xt_tab_cache.tcm_hash_size; j++) {
108
page = seg->tcs_hash_table[j];
111
page = page->tcp_next;
112
ASSERT_NS(seg->tcs_cache_in_use >= offsetof(XTTabCachePageRec, tcp_data) + tmp_page->tcp_data_size);
113
seg->tcs_cache_in_use -= (offsetof(XTTabCachePageRec, tcp_data) + tmp_page->tcp_data_size);
114
ASSERT_NS(seg->tcs_cache_in_use == 0 || seg->tcs_cache_in_use >= 25000);
115
xt_free(self, tmp_page);
119
#ifdef CHECK_DOUBLE_READ
120
printf("reads = %d, not req. = %d %% unrequired = %.2f\n", seg->tcs_total_reads, seg->tcs_read_not_req,
121
(double) seg->tcs_read_not_req / (double) seg->tcs_total_reads);
123
xt_free(self, seg->tcs_hash_table);
124
seg->tcs_hash_table = NULL;
125
TAB_CAC_FREE_LOCK(self, &seg->tcs_lock);
127
ASSERT_NS(seg->tcs_cache_in_use == 0);
130
xt_free_mutex(&xt_tab_cache.tcm_lock);
131
xt_free_cond(&xt_tab_cache.tcm_cond);
132
xt_free_mutex(&xt_tab_cache.tcm_freeer_lock);
133
xt_free_cond(&xt_tab_cache.tcm_freeer_cond);
136
xtPublic xtInt8 xt_tc_get_usage()
140
for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) {
141
size += xt_tab_cache.tcm_segment[i].tcs_cache_in_use;
146
xtPublic xtInt8 xt_tc_get_size()
148
return (xtInt8) xt_tab_cache.tcm_cache_size;
151
xtPublic xtInt8 xt_tc_get_high()
153
return (xtInt8) xt_tab_cache.tcm_cache_high;
157
xtPublic void xt_check_table_cache(XTTableHPtr tab)
159
XTTabCachePagePtr page, ppage;
161
xt_lock_mutex_ns(&xt_tab_cache.tcm_lock);
163
page = xt_tab_cache.tcm_lru_page;
166
if (page->tcp_db_id == tab->tab_db->db_id && page->tcp_tab_id == tab->tab_id) {
167
ASSERT_NS(!XTTableSeq::xt_op_is_before(tab->tab_seq.ts_next_seq, page->tcp_op_seq));
170
ASSERT_NS(page->tcp_lr_used == ppage);
172
page = page->tcp_mr_used;
174
ASSERT_NS(xt_tab_cache.tcm_mru_page == ppage);
175
xt_unlock_mutex_ns(&xt_tab_cache.tcm_lock);
179
void XTTabCache::xt_tc_setup(XTTableHPtr tab, xtBool rec_file, size_t head_size, size_t rec_size)
182
tci_rec_file = rec_file;
183
tci_header_size = head_size;
184
tci_rec_size = rec_size;
185
tci_rows_per_page = (XT_TC_PAGE_SIZE / rec_size) + 1;
186
if (tci_rows_per_page < 2)
187
tci_rows_per_page = 2;
188
tci_page_size = tci_rows_per_page * rec_size;
190
#ifdef TRACE_DISTRIBUTION
191
tabc_init_dist(this, tab->tab_rec_file);
196
* This function assumes that we never write past the boundary of a page.
197
* This should be the case, because we should never write more than
198
* a row, and there are only whole rows on a page.
200
xtBool XTTabCache::xt_tc_write(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t inc, size_t size, xtWord1 *data, xtOpSeqNo *op_seq, xtBool read, XTThreadPtr thread)
203
XTTabCachePagePtr page;
204
XTTabCacheSegPtr seg;
206
#ifdef TRACE_DISTRIBUTION
207
tabc_dist_change(file, ref_id);
212
if (!tc_fetch(file, ref_id, &seg, &page, &offset, read, thread))
214
/* Don't write while there is a read lock on the page,
215
* which can happen during a sequential scan...
217
* This will have to be OK.
218
* I cannot wait for the lock because a thread locks
219
* itself out when updating during a sequential scan.
221
* However, I don't think this is a problem, because
222
* the only records that are changed, are records
223
* containing uncommitted data. Such records should
224
* be ignored by a sequential scan. As long as
225
* we don't crash due to reading half written
228
if (page->tcp_lock_count) {
229
if (!xt_timed_wait_cond_ns(&seg->tcs_cond, &seg->tcs_lock, 100)) {
230
xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id);
233
xt_rwmutex_unlock(&seg->tcs_lock, thread->t_id);
234
// The page may have dissappeared from the cache, while we were sleeping!
239
ASSERT_NS(offset + inc + 4 <= tci_page_size);
240
memcpy(page->tcp_data + offset + inc, data, size);
241
/* GOTCHA, this was "op_seq > page->tcp_op_seq", however
242
* this does not handle overflow!
243
if (XTTableSeq::xt_op_is_before(page->tcp_op_seq, op_seq))
244
page->tcp_op_seq = op_seq;
247
page->tcp_dirty = TRUE;
248
ASSERT_NS(page->tcp_db_id == tci_table->tab_db->db_id && page->tcp_tab_id == tci_table->tab_id);
249
*op_seq = tci_table->tab_seq.ts_set_op_seq(page);
250
TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
255
* This is a special version of write which is used to set the "clean" bit.
256
* The alternative would be to read the record first, but this
259
* This function also checks if xn_id, row_id and other data match (the checks
260
* are similar to xn_sw_cleanup_done) before modifying the record, otherwise it
261
* assumes that the record was already updated earlier and we must not set it to
264
* If the record was not modified the function returns FALSE.
266
* The function has a self pointer and can throw an exception.
268
xtBool XTTabCache::xt_tc_write_cond(XTThreadPtr self, XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord1 new_type, xtOpSeqNo *op_seq,
269
xtXactID xn_id, xtRowID row_id, u_int stat_id, u_int rec_type)
272
XTTabCachePagePtr page;
273
XTTabCacheSegPtr seg;
274
XTTabRecHeadDPtr rec_head;
276
#ifdef TRACE_DISTRIBUTION
277
tabc_dist_change(file, ref_id);
279
if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, self))
282
ASSERT(offset + 1 <= tci_page_size);
284
rec_head = (XTTabRecHeadDPtr)(page->tcp_data + offset);
286
/* Transaction must match: */
287
if (XT_GET_DISK_4(rec_head->tr_xact_id_4) != xn_id)
290
/* Record header must match expected value from
291
* log or clean has been done, or is not required.
293
* For example, it is not required if a record
294
* has been overwritten in a transaction.
296
if (rec_head->tr_rec_type_1 != rec_type ||
297
rec_head->tr_stat_id_1 != stat_id)
300
/* Row must match: */
301
if (XT_GET_DISK_4(rec_head->tr_row_id_4) != row_id)
304
*(page->tcp_data + offset) = new_type;
306
page->tcp_dirty = TRUE;
307
ASSERT(page->tcp_db_id == tci_table->tab_db->db_id && page->tcp_tab_id == tci_table->tab_id);
308
*op_seq = tci_table->tab_seq.ts_set_op_seq(page);
309
TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
313
TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
317
xtBool XTTabCache::xt_tc_read(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t size, xtWord1 *data, XTThreadPtr thread)
319
#ifdef XT_USE_ROW_REC_MMAP_FILES
320
return tc_read_direct(file, ref_id, size, data, thread);
323
XTTabCachePagePtr page;
324
XTTabCacheSegPtr seg;
326
if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, thread))
328
/* A read must be completely on a page: */
329
ASSERT_NS(offset + size <= tci_page_size);
330
memcpy(data, page->tcp_data + offset, size);
331
TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
336
xtBool XTTabCache::xt_tc_read_4(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord4 *value, XTThreadPtr thread)
338
#ifdef XT_USE_ROW_REC_MMAP_FILES
339
register u_int page_idx;
340
register XTTabCachePagePtr page;
341
register XTTabCacheSegPtr seg;
342
register u_int hash_idx;
343
register XTTabCacheMemPtr dcg = &xt_tab_cache;
348
page_idx = ref_id / this->tci_rows_per_page;
349
address = (off_t) ref_id * (off_t) this->tci_rec_size + (off_t) this->tci_header_size;
351
hash_idx = page_idx + (file->fr_id * 223);
352
seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
353
hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
355
TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id);
356
page = seg->tcs_hash_table[hash_idx];
358
if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
362
offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
363
ASSERT_NS(offset + 4 <= this->tci_page_size);
364
buffer = page->tcp_data + offset;
365
*value = XT_GET_DISK_4(buffer);
366
TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
369
page = page->tcp_next;
371
TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
373
return xt_pread_file_4(file, address, value, &thread->st_statistics.st_rec, thread);
376
XTTabCachePagePtr page;
377
XTTabCacheSegPtr seg;
380
if (!tc_fetch(file, ref_id, &seg, &page, &offset, TRUE, thread))
382
/* A read must be completely on a page: */
383
ASSERT_NS(offset + 4 <= tci_page_size);
384
data = page->tcp_data + offset;
385
*value = XT_GET_DISK_4(data);
386
TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
391
xtBool XTTabCache::xt_tc_get_page(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtBool load, XTTabCachePagePtr *ret_page, size_t *offset, XTThreadPtr thread)
393
XTTabCachePagePtr page;
394
XTTabCacheSegPtr seg;
397
if (!tc_fetch(file, ref_id, &seg, &page, offset, TRUE, thread))
401
if (!tc_fetch_direct(file, ref_id, &seg, &page, offset, thread))
408
page->tcp_lock_count++;
409
TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
414
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
415
void XTTabCache::xt_tc_release_page(XT_ROW_REC_FILE_PTR XT_UNUSED(file), XTTabCachePagePtr page, XTThreadPtr thread __attribute__((unused)))
417
XTTabCacheSegPtr seg;
419
seg = &xt_tab_cache.tcm_segment[page->tcp_seg];
420
TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id);
423
XTTabCachePagePtr lpage, ppage;
426
lpage = seg->tcs_hash_table[page->tcp_hash_idx];
428
if (lpage->tcp_page_idx == page->tcp_page_idx &&
429
lpage->tcp_file_id == page->tcp_file_id)
432
lpage = lpage->tcp_next;
435
ASSERT_NS(page == lpage);
436
ASSERT_NS(page->tcp_lock_count > 0);
439
if (page->tcp_lock_count > 0)
440
page->tcp_lock_count--;
442
TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
445
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
446
xtBool XTTabCache::xt_tc_lock_page(XT_ROW_REC_FILE_PTR file, XTTabCachePagePtr *ret_page, xtRefID ref_id, size_t *offset, XTThreadPtr thread __attribute__((unused)))
448
XTTabCachePagePtr page;
449
XTTabCacheSegPtr seg;
451
#ifdef TRACE_DISTRIBUTION
452
tabc_dist_change(file, ref_id);
454
if (!tc_fetch(file, ref_id, &seg, &page, offset, TRUE, thread))
460
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
461
void XTTabCache::xt_tc_unlock_page(XT_ROW_REC_FILE_PTR XT_UNUSED(file), XTTabCachePagePtr page, xtOpSeqNo *op_seq, XTThreadPtr thread __attribute__((unused)))
463
XTTabCacheSegPtr seg;
465
seg = &xt_tab_cache.tcm_segment[page->tcp_seg];
466
page->tcp_dirty = TRUE;
467
*op_seq = tci_table->tab_seq.ts_set_op_seq(page);
468
TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
471
xtBool XTTabCache::xt_tc_read_page(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, xtWord1 *data, XTThreadPtr thread)
473
return tc_read_direct(file, ref_id, this->tci_page_size, data, thread);
476
/* Read row and record files directly.
477
* This by-passed the cache when reading, which mean
478
* we rely in the OS for caching.
479
* This probably only makes sense when these files
482
xtBool XTTabCache::tc_read_direct(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, size_t size, xtWord1 *data, XTThreadPtr thread)
484
register u_int page_idx;
485
register XTTabCachePagePtr page;
486
register XTTabCacheSegPtr seg;
487
register u_int hash_idx;
488
register XTTabCacheMemPtr dcg = &xt_tab_cache;
494
page_idx = ref_id / this->tci_rows_per_page;
495
address = (off_t) ref_id * (off_t) this->tci_rec_size + (off_t) this->tci_header_size;
497
hash_idx = page_idx + (file->fr_id * 223);
498
seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
499
hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
501
TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id);
502
page = seg->tcs_hash_table[hash_idx];
504
if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
507
offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
508
ASSERT_NS(offset + size <= this->tci_page_size);
509
memcpy(data, page->tcp_data + offset, size);
510
TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
513
page = page->tcp_next;
515
TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
516
if (!XT_PREAD_RR_FILE(file, address, size, 0, data, &red_size, &thread->st_statistics.st_rec, thread))
518
memset(data + red_size, 0, size - red_size);
522
// Depending on platform 'thread->t_id' may not be used by TAB_CAC_WRITE_LOCK().
523
xtBool XTTabCache::tc_fetch_direct(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCacheSegPtr *ret_seg, XTTabCachePagePtr *ret_page, size_t *offset, XTThreadPtr thread __attribute__((unused)))
525
register u_int page_idx;
526
register XTTabCachePagePtr page;
527
register XTTabCacheSegPtr seg;
528
register u_int hash_idx;
529
register XTTabCacheMemPtr dcg = &xt_tab_cache;
533
page_idx = ref_id / this->tci_rows_per_page;
534
*offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
536
hash_idx = page_idx + (file->fr_id * 223);
537
seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
538
hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
540
TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id);
541
page = seg->tcs_hash_table[hash_idx];
543
if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
548
page = page->tcp_next;
550
TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
557
* Note, this function may return an exclusive, or a shared lock.
558
* If the page is in cache it will return a shared lock of the segment.
559
* If the page was just added to the cache it will return an
562
xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCacheSegPtr *ret_seg, XTTabCachePagePtr *ret_page, size_t *offset, xtBool read, XTThreadPtr thread)
564
register u_int page_idx;
565
register XTTabCachePagePtr page, new_page;
566
register XTTabCacheSegPtr seg;
567
register u_int hash_idx;
568
register XTTabCacheMemPtr dcg = &xt_tab_cache;
574
page_idx = ref_id / this->tci_rows_per_page;
575
address = (off_t) page_idx * (off_t) this->tci_page_size + (off_t) this->tci_header_size;
576
*offset = (ref_id % this->tci_rows_per_page) * this->tci_rec_size;
578
hash_idx = page_idx + (file->fr_id * 223);
579
seg = &dcg->tcm_segment[hash_idx & XT_TC_SEGMENT_MASK];
580
hash_idx = (hash_idx >> XT_TC_SEGMENT_SHIFTS) % dcg->tcm_hash_size;
582
TAB_CAC_READ_LOCK(&seg->tcs_lock, thread->t_id);
583
page = seg->tcs_hash_table[hash_idx];
585
if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
586
/* This page has been most recently used: */
587
if (XT_TIME_DIFF(page->tcp_ru_time, dcg->tcm_ru_now) > (dcg->tcm_approx_page_count >> 1)) {
588
/* Move to the front of the MRU list: */
589
xt_lock_mutex_ns(&dcg->tcm_lock);
591
page->tcp_ru_time = ++dcg->tcm_ru_now;
592
if (dcg->tcm_mru_page != page) {
593
/* Remove from the MRU list: */
594
if (dcg->tcm_lru_page == page)
595
dcg->tcm_lru_page = page->tcp_mr_used;
596
if (page->tcp_lr_used)
597
page->tcp_lr_used->tcp_mr_used = page->tcp_mr_used;
598
if (page->tcp_mr_used)
599
page->tcp_mr_used->tcp_lr_used = page->tcp_lr_used;
601
/* Make the page the most recently used: */
602
if ((page->tcp_lr_used = dcg->tcm_mru_page))
603
dcg->tcm_mru_page->tcp_mr_used = page;
604
page->tcp_mr_used = NULL;
605
dcg->tcm_mru_page = page;
606
if (!dcg->tcm_lru_page)
607
dcg->tcm_lru_page = page;
609
xt_unlock_mutex_ns(&dcg->tcm_lock);
613
thread->st_statistics.st_rec_cache_hit++;
616
page = page->tcp_next;
619
size_t page_size = offsetof(XTTabCachePageRec, tcp_data) + this->tci_page_size;
621
TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
623
/* Page not found, allocate a new page: */
624
if (!(new_page = (XTTabCachePagePtr) xt_malloc_ns(page_size)))
627
/* Check the level of the cache: */
628
size_t cache_used = 0;
629
for (int i=0; i<XT_TC_SEGMENT_COUNT; i++)
630
cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
632
if (cache_used + page_size > dcg->tcm_cache_high)
633
dcg->tcm_cache_high = cache_used;
635
if (cache_used + page_size > dcg->tcm_cache_size) {
639
/* Wait for the cache level to go down.
640
* If this happens, then the freeer is not working fast
644
/* But before I do this, I must flush my own log because:
645
* - The freeer might be waiting for a page to be cleaned.
646
* - The page can only be cleaned once it has been written to
648
* - The writer cannot write the page data until it has been
649
* flushed to the log.
650
* - The log won't be flushed, unless this thread does it.
651
* So there could be a deadlock if I don't flush the log!
653
if ((self = xt_get_self())) {
654
if (!xt_xlog_flush_log(tci_table->tab_db, self))
658
/* Wait for the free'er thread: */
659
xt_lock_mutex_ns(&dcg->tcm_freeer_lock);
662
/* I have set the timeout to 2 here because of the following situation:
663
* 1. Transaction allocates an op seq
664
* 2. Transaction goes to update cache, but must wait for
665
* cache to be freed (after this, the op would be written to
667
* 3. The free'er wants to free cache, but is waiting for the writter.
668
* 4. The writer cannot continue because an op seq is missing!
669
* So the writer is waiting for the transaction thread to write
671
* - So we have a deadlock situation.
672
* - However, this situation can only occur if there is not enougn
674
* The timeout helps, but will not solve the problem, unless we
675
* ignore cache level here, after a while, and just continue.
678
/* Wake freeer before we go to sleep: */
679
if (!dcg->tcm_freeer_busy) {
680
if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
681
xt_log_and_clear_exception_ns();
684
dcg->tcm_threads_waiting++;
686
if (!xt_timed_wait_cond_ns(&dcg->tcm_freeer_cond, &dcg->tcm_freeer_lock, 30000)) {
687
dcg->tcm_threads_waiting--;
691
if (!xt_timed_wait_cond_ns(&dcg->tcm_freeer_cond, &dcg->tcm_freeer_lock, 1000)) {
692
dcg->tcm_threads_waiting--;
696
dcg->tcm_threads_waiting--;
699
for (int i=0; i<XT_TC_SEGMENT_COUNT; i++)
700
cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
702
if (cache_used + page_size <= dcg->tcm_high_level)
705
* If there is too little cache we can get stuck here.
706
* The problem is that seg numbers are allocated before fetching a
707
* record to be updated.
709
* It can happen that we end up waiting for that seq number
710
* to be written to the log before we can continue here.
712
* This happens as follows:
713
* 1. This thread waits for the freeer.
714
* 2. The freeer cannot free a page because it has not been
715
* written by the writter.
716
* 3. The writter cannot continue because it is waiting
717
* for a missing sequence number.
718
* 4. The missing sequence number is the one allocated
719
* before we entered this function!
721
* So don't wait for more than 5 seconds here!
724
while (time(NULL) < now + 5);
725
xt_unlock_mutex_ns(&dcg->tcm_freeer_lock);
727
else if (cache_used + page_size > dcg->tcm_high_level) {
728
/* Wake up the freeer because the cache level,
729
* is higher than the high level.
731
if (!dcg->tcm_freeer_busy) {
732
xt_lock_mutex_ns(&xt_tab_cache.tcm_freeer_lock);
733
if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond))
734
xt_log_and_clear_exception_ns();
735
xt_unlock_mutex_ns(&xt_tab_cache.tcm_freeer_lock);
739
/* Read the page into memory.... */
740
new_page->tcp_dirty = FALSE;
741
new_page->tcp_seg = (xtWord1) ((page_idx + (file->fr_id * 223)) & XT_TC_SEGMENT_MASK);
742
#ifdef XT_CLUSTER_FREE_RECORDS
743
new_page->tcp_free_rec = 0xFFFF;
745
new_page->tcp_lock_count = 0;
746
new_page->tcp_hash_idx = hash_idx;
747
new_page->tcp_page_idx = page_idx;
748
new_page->tcp_file_id = file->fr_id;
749
new_page->tcp_db_id = this->tci_table->tab_db->db_id;
750
new_page->tcp_tab_id = this->tci_table->tab_id;
751
new_page->tcp_data_size = this->tci_page_size;
752
new_page->tcp_op_seq = 0; // Value not used because not dirty
755
if (!XT_PREAD_RR_FILE(file, address, this->tci_page_size, 0, new_page->tcp_data, &red_size, &thread->st_statistics.st_rec, thread))
758
#ifdef XT_CLUSTER_FREE_RECORDS
759
/* Find the first free record! */
764
buff_ptr = new_page->tcp_data;
765
end_ptr = new_page->tcp_data + red_size;
766
while (buff_ptr < end_ptr) {
767
if (XT_REC_IS_FREE(*buff_ptr)) {
768
new_page->tcp_free_rec = (xtWord2) (buff_ptr - new_page->tcp_data);
771
buff_ptr += tci_rec_size;
776
#ifdef XT_MEMSET_UNUSED_SPACE
780
/* Removing this is an optimization. It should not be required
781
* to clear the unused space in the page.
783
memset(new_page->tcp_data + red_size, 0, this->tci_page_size - red_size);
786
/* Add the page to the cache! */
787
TAB_CAC_WRITE_LOCK(&seg->tcs_lock, thread->t_id);
788
#ifdef CHECK_DOUBLE_READ
789
seg->tcs_total_reads++;
791
page = seg->tcs_hash_table[hash_idx];
793
if (page->tcp_page_idx == page_idx && page->tcp_file_id == file->fr_id) {
794
/* Oops, someone else was faster! */
795
#ifdef CHECK_DOUBLE_READ
796
seg->tcs_read_not_req++;
798
xt_free_ns(new_page);
801
page = page->tcp_next;
805
/* Make the page the most recently used: */
806
xt_lock_mutex_ns(&dcg->tcm_lock);
807
page->tcp_ru_time = ++dcg->tcm_ru_now;
808
if ((page->tcp_lr_used = dcg->tcm_mru_page))
809
dcg->tcm_mru_page->tcp_mr_used = page;
810
page->tcp_mr_used = NULL;
811
dcg->tcm_mru_page = page;
812
if (!dcg->tcm_lru_page)
813
dcg->tcm_lru_page = page;
814
xt_unlock_mutex_ns(&dcg->tcm_lock);
816
/* Add the page to the hash table: */
817
page->tcp_next = seg->tcs_hash_table[hash_idx];
818
seg->tcs_hash_table[hash_idx] = page;
820
/* GOTCHA! This increment was done just after the malloc!
821
* So it was not protected by the segment lock!
822
* The result was that this count was no longer reliable,
823
* This resulted in the amount of cache being used becoming less, and\
824
* less, because increments were lost over time!
826
/* Increment cache used. */
827
seg->tcs_cache_in_use += page_size;
832
#ifdef DEBUG_CHECK_CACHE
833
//XT_TC_check_cache();
835
thread->st_statistics.st_rec_cache_miss++;
839
xt_free_ns(new_page);
844
/* ----------------------------------------------------------------------
848
xtBool XTTableSeq::ts_log_no_op(XTThreadPtr thread, xtTableID tab_id, xtOpSeqNo op_seq)
850
XTactNoOpEntryDRec ent_rec;
851
xtWord4 sum = (xtWord4) tab_id ^ (xtWord4) op_seq;
853
ent_rec.no_status_1 = XT_LOG_ENT_NO_OP;
854
ent_rec.no_checksum_1 = XT_CHECKSUM_1(sum);
855
XT_SET_DISK_4(ent_rec.no_tab_id_4, tab_id);
856
XT_SET_DISK_4(ent_rec.no_op_seq_4, op_seq);
857
/* TODO - If this also fails we have a problem.
858
* From this point on we should actually not generate
859
* any more op IDs. The problem is that the
860
* some will be missing, so the writer will not
861
* be able to contniue.
863
return xt_xlog_log_data(thread, sizeof(XTactNoOpEntryDRec), (XTXactLogBufferDPtr) &ent_rec, XT_XLOG_NO_WRITE_NO_FLUSH);
867
xtOpSeqNo XTTableSeq::ts_set_op_seq(XTTabCachePagePtr page)
871
xt_lock_mutex_ns(&ts_ns_lock);
872
page->tcp_op_seq = seq = ts_next_seq++;
873
xt_unlock_mutex_ns(&ts_ns_lock);
877
xtOpSeqNo XTTableSeq::ts_get_op_seq()
881
xt_lock_mutex_ns(&ts_ns_lock);
883
xt_unlock_mutex_ns(&ts_ns_lock);
890
* Return TRUE if the current sequence is before the
891
* target (then) sequence number. This function
892
* takes into account overflow. Overflow is detected
893
* by checking the difference between the 2 values.
894
* If the difference is very large, then we
897
xtBool XTTableSeq::xt_op_is_before(register xtOpSeqNo now, register xtOpSeqNo then)
899
ASSERT_NS(sizeof(xtOpSeqNo) == 4);
900
/* The now time is being incremented.
901
* If it is after the then time (which is static, then
905
if ((now - then) > (xtOpSeqNo) 0xFFFFFFFF/2)
910
/* If it appears to be before, we still have to check
911
* for overflow. If the gap is bigger then half of
912
* the MAX value, then we can assume it has wrapped around
913
* because we know that no then can be so far in the
916
if ((then - now) > (xtOpSeqNo) 0xFFFFFFFF/2)
923
/* ----------------------------------------------------------------------
924
* F R E E E R P R O C E S S
928
* Used by the writer to wake the freeer.
930
xtPublic void xt_wr_wake_freeer(XTThreadPtr XT_UNUSED(self), XTDatabaseHPtr db)
932
/* BUG FIX: Was using tcm_freeer_cond.
933
* This is incorrect. When the freeer waits for the
934
* writter, it uses the writer's condition!
936
xt_lock_mutex_ns(&db->db_wr_lock);
937
if (!xt_broadcast_cond_ns(&db->db_wr_cond))
938
xt_log_and_clear_exception_ns();
939
xt_unlock_mutex_ns(&db->db_wr_lock);
941
xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
942
pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
943
if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond))
944
xt_log_and_clear_exception_ns();
945
freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
949
/* Wait for a transaction to quit: */
950
static void tabc_fr_wait_for_cache(XTThreadPtr self, u_int msecs)
953
xt_timed_wait_cond(NULL, &xt_tab_cache.tcm_freeer_cond, &xt_tab_cache.tcm_freeer_lock, msecs);
956
typedef struct TCResource {
957
XTOpenTablePtr tc_ot;
958
} TCResourceRec, *TCResourcePtr;
960
static void tabc_free_fr_resources(XTThreadPtr self, TCResourcePtr tc)
963
xt_db_return_table_to_pool(self, tc->tc_ot);
968
static XTTableHPtr tabc_get_table(XTThreadPtr self, TCResourcePtr tc, xtDatabaseID db_id, xtTableID tab_id)
974
tab = tc->tc_ot->ot_table;
975
if (tab->tab_id == tab_id && tab->tab_db->db_id == db_id)
978
xt_db_return_table_to_pool(self, tc->tc_ot);
983
if (!(db = xt_get_database_by_id(self, db_id)))
986
pushr_(xt_heap_release, db);
987
tc->tc_ot = xt_db_open_pool_table(self, db, tab_id, NULL, TRUE);
988
freer_(); // xt_heap_release(db);
993
return tc->tc_ot->ot_table;
997
* Free the given page, or the least recently used page.
998
* Return the amount of bytes freed.
1000
static size_t tabc_free_page(XTThreadPtr self, TCResourcePtr tc)
1002
register XTTabCacheMemPtr dcg = &xt_tab_cache;
1003
XTTableHPtr tab = NULL;
1004
XTTabCachePagePtr page, lpage, ppage;
1005
XTTabCacheSegPtr seg;
1009
#ifdef DEBUG_CHECK_CACHE
1010
//XT_TC_check_cache();
1012
dcg->tcm_free_try_count = 0;
1015
/* Note, handling the page is safe because
1016
* there is only one free'er thread which
1017
* can remove pages from the cache!
1020
if (!(page = dcg->tcm_lru_page)) {
1021
dcg->tcm_free_try_count = 0;
1026
if ((was_dirty = page->tcp_dirty)) {
1027
/* Do all this stuff without a lock, because to
1028
* have a lock while doing this is too expensive!
1031
/* Wait for the page to be cleaned. */
1032
tab = tabc_get_table(self, tc, page->tcp_db_id, page->tcp_tab_id);
1035
seg = &dcg->tcm_segment[page->tcp_seg];
1036
TAB_CAC_WRITE_LOCK(&seg->tcs_lock, self->t_id);
1038
if (page->tcp_dirty) {
1040
TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1045
ASSERT(!XTTableSeq::xt_op_is_before(tab->tab_seq.ts_next_seq, page->tcp_op_seq+1));
1046
/* This should never happen. However, is has been occuring,
1047
* during multi_update test on Windows.
1048
* In particular it occurs after rename of a table, during ALTER.
1049
* As if the table was not flushed before the rename!?
1050
* To guard against an infinite loop below, I will just continue here.
1052
if (XTTableSeq::xt_op_is_before(tab->tab_seq.ts_next_seq, page->tcp_op_seq+1))
1054
/* OK, we have the table, now we check where the current
1055
* sequence number is.
1057
if (XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, page->tcp_op_seq)) {
1058
XTDatabaseHPtr db = tab->tab_db;
1061
TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1063
/* Flush the log, in case this is holding up the
1066
if (!db->db_xlog.xlog_flush(self)) {
1067
dcg->tcm_free_try_count = 0;
1071
xt_lock_mutex(self, &db->db_wr_lock);
1072
pushr_(xt_unlock_mutex, &db->db_wr_lock);
1074
/* The freeer is now waiting: */
1075
db->db_wr_freeer_waiting = TRUE;
1077
/* If the writer is idle, wake it up.
1078
* The writer will commit the changes to the database
1079
* which will allow the freeer to free up the cache.
1081
if (db->db_wr_idle) {
1082
if (!xt_broadcast_cond_ns(&db->db_wr_cond))
1083
xt_log_and_clear_exception_ns();
1086
/* Go to sleep on the writer's condition.
1087
* The writer will wake the free'er before it goes to
1090
tab->tab_wake_freeer_op = page->tcp_op_seq;
1091
tab->tab_wr_wake_freeer = TRUE;
1092
if (!xt_timed_wait_cond_ns(&db->db_wr_cond, &db->db_wr_lock, 30000)) {
1093
tab->tab_wr_wake_freeer = FALSE;
1094
db->db_wr_freeer_waiting = FALSE;
1097
tab->tab_wr_wake_freeer = FALSE;
1098
db->db_wr_freeer_waiting = FALSE;
1099
freer_(); // xt_unlock_mutex(&db->db_wr_lock)
1101
TAB_CAC_WRITE_LOCK(&seg->tcs_lock, self->t_id);
1102
if (XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, page->tcp_op_seq))
1109
/* Wait if the page is being read or locked. */
1110
if (page->tcp_lock_count) {
1111
/* (1) If the page is being read, then we should not free
1113
* (2) If a page is locked, the locker may be waiting
1114
* for the freeer to free some cache - this
1115
* causes a deadlock.
1117
* Therefore, we move on, and try to free another page...
1119
if (page_cnt < (dcg->tcm_approx_page_count >> 1)) {
1120
/* Page has not changed MRU position, and we
1121
* have looked at less than half of the pages.
1122
* Go to the next page...
1124
if ((page = page->tcp_mr_used)) {
1126
TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1130
TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1131
dcg->tcm_free_try_count++;
1133
/* Starting to spin, free the threads: */
1134
if (dcg->tcm_threads_waiting) {
1135
if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
1136
xt_log_and_clear_exception_ns();
1141
/* Page is clean, remove from the hash table: */
1143
/* Find the page on the list: */
1144
u_int page_idx = page->tcp_page_idx;
1145
u_int file_id = page->tcp_file_id;
1148
lpage = seg->tcs_hash_table[page->tcp_hash_idx];
1150
if (lpage->tcp_page_idx == page_idx && lpage->tcp_file_id == file_id)
1153
lpage = lpage->tcp_next;
1156
if (page == lpage) {
1157
/* Should be the case! */
1159
ppage->tcp_next = page->tcp_next;
1161
seg->tcs_hash_table[page->tcp_hash_idx] = page->tcp_next;
1168
/* Remove from the MRU list: */
1169
xt_lock_mutex_ns(&dcg->tcm_lock);
1170
if (dcg->tcm_lru_page == page)
1171
dcg->tcm_lru_page = page->tcp_mr_used;
1172
if (dcg->tcm_mru_page == page)
1173
dcg->tcm_mru_page = page->tcp_lr_used;
1174
if (page->tcp_lr_used)
1175
page->tcp_lr_used->tcp_mr_used = page->tcp_mr_used;
1176
if (page->tcp_mr_used)
1177
page->tcp_mr_used->tcp_lr_used = page->tcp_lr_used;
1178
xt_unlock_mutex_ns(&dcg->tcm_lock);
1180
/* Free the page: */
1181
size_t freed_space = offsetof(XTTabCachePageRec, tcp_data) + page->tcp_data_size;
1182
ASSERT_NS(seg->tcs_cache_in_use >= freed_space);
1183
seg->tcs_cache_in_use -= freed_space;
1184
ASSERT_NS(seg->tcs_cache_in_use == 0 || seg->tcs_cache_in_use >= 25000);
1187
TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1188
self->st_statistics.st_rec_cache_frees++;
1189
dcg->tcm_free_try_count = 0;
1194
#define CACHE_HIGH_LEVEL(d)
1196
static void tabc_fr_main(XTThreadPtr self)
1198
register XTTabCacheMemPtr dcg = &xt_tab_cache;
1199
TCResourceRec tc = { 0 };
1202
xt_set_low_priority(self);
1203
dcg->tcm_freeer_busy = TRUE;
1205
while (!self->t_quit) {
1206
size_t cache_used, freed;
1208
pushr_(tabc_free_fr_resources, &tc);
1210
while (!self->t_quit) {
1211
/* Total up the cache memory used: */
1213
for (i=0; i<XT_TC_SEGMENT_COUNT; i++)
1214
cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
1216
if (cache_used > dcg->tcm_cache_high)
1217
dcg->tcm_cache_high = cache_used;
1219
/* Check if the cache usage is over 95%: */
1223
/* If threads are waiting then we are more aggressive about freeing
1226
if (cache_used < (dcg->tcm_threads_waiting ? dcg->tcm_mid_level : dcg->tcm_high_level))
1229
/* Reduce cache to the 75% level: */
1230
while (!self->t_quit && cache_used > dcg->tcm_low_level) {
1231
freed = tabc_free_page(self, &tc);
1232
cache_used -= freed;
1233
if (cache_used <= dcg->tcm_high_level) {
1234
/* Wakeup any threads that are waiting for some cache to be
1237
if (dcg->tcm_threads_waiting) {
1238
if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
1239
xt_log_and_clear_exception_ns();
1245
freer_(); // tabc_free_fr_resources(&tc)
1247
xt_lock_mutex(self, &dcg->tcm_freeer_lock);
1248
pushr_(xt_unlock_mutex, &dcg->tcm_freeer_lock);
1250
if (dcg->tcm_threads_waiting) {
1251
/* Wake threads before we go to sleep: */
1252
if (!xt_broadcast_cond_ns(&dcg->tcm_freeer_cond))
1253
xt_log_and_clear_exception_ns();
1256
/* Wait for a thread that allocates data to signal
1257
* that the cache level has exceeeded the upper limit:
1259
xt_db_approximate_time = time(NULL);
1260
dcg->tcm_freeer_busy = FALSE;
1261
/* No idea, why, but I am getting an uneccesarry pause here.
1262
* I run DBT2 with low record cache.
1264
* Every now and then there is a pause where the freeer is here,
1265
* and all user threads are waiting for the freeer.
1267
* So adding the tcm_threads_waiting condition.
1269
if (dcg->tcm_threads_waiting) {
1271
for (i=0; i<XT_TC_SEGMENT_COUNT; i++)
1272
cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
1273
if (cache_used < dcg->tcm_mid_level)
1274
tabc_fr_wait_for_cache(self, 500);
1277
tabc_fr_wait_for_cache(self, 500);
1278
//tabc_fr_wait_for_cache(self, 30*1000);
1279
dcg->tcm_freeer_busy = TRUE;
1280
xt_db_approximate_time = time(NULL);
1281
freer_(); // xt_unlock_mutex(&dcg->tcm_freeer_lock)
1285
static void *tabc_fr_run_thread(XTThreadPtr self)
1290
myxt_wait_pbxt_plugin_slot_assigned(self);
1292
mysql_thread = myxt_create_thread();
1294
while (!self->t_quit) {
1299
/* This error is "normal"! */
1300
if (!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
1301
self->t_exception.e_sys_err == SIGTERM))
1302
xt_log_and_clear_exception(self);
1306
/* After an exception, pause before trying again... */
1307
/* Number of seconds */
1313
while (!self->t_quit && count > 0) {
1314
xt_db_approximate_time = time(NULL);
1321
* {MYSQL-THREAD-KILL}
1322
myxt_destroy_thread(mysql_thread, TRUE);
1327
static void tabc_fr_free_thread(XTThreadPtr self, void *XT_UNUSED(data))
1329
if (xt_tab_cache.tcm_freeer_thread) {
1330
xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
1331
pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
1332
xt_tab_cache.tcm_freeer_thread = NULL;
1333
freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
1337
xtPublic void xt_start_freeer(XTThreadPtr self)
1339
xt_tab_cache.tcm_freeer_thread = xt_create_daemon(self, "free-er");
1340
xt_set_thread_data(xt_tab_cache.tcm_freeer_thread, NULL, tabc_fr_free_thread);
1341
xt_run_thread(self, xt_tab_cache.tcm_freeer_thread, tabc_fr_run_thread);
1344
xtPublic void xt_quit_freeer(XTThreadPtr self)
1346
if (xt_tab_cache.tcm_freeer_thread) {
1347
xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
1348
pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
1349
xt_terminate_thread(self, xt_tab_cache.tcm_freeer_thread);
1350
freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
1354
xtPublic void xt_stop_freeer(XTThreadPtr self)
1358
if (xt_tab_cache.tcm_freeer_thread) {
1359
xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
1360
pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
1362
/* This pointer is safe as long as you have the transaction lock. */
1363
if ((thr_fr = xt_tab_cache.tcm_freeer_thread)) {
1364
xtThreadID tid = thr_fr->t_id;
1366
/* Make sure the thread quits when woken up. */
1367
xt_terminate_thread(self, thr_fr);
1369
/* Wake the freeer to get it to quit: */
1370
if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond))
1371
xt_log_and_clear_exception_ns();
1373
freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
1376
* GOTCHA: This is a wierd thing but the SIGTERM directed
1377
* at a particular thread (in this case the sweeper) was
1378
* being caught by a different thread and killing the server
1379
* sometimes. Disconcerting.
1380
* (this may only be a problem on Mac OS X)
1381
xt_kill_thread(thread);
1383
xt_wait_for_thread_to_exit(tid, FALSE);
1385
/* PMC - This should not be necessary to set the signal here, but in the
1386
* debugger the handler is not called!!?
1387
thr_fr->t_delayed_signal = SIGTERM;
1388
xt_kill_thread(thread);
1390
xt_tab_cache.tcm_freeer_thread = NULL;
1393
freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
1397
xtPublic void xt_load_pages(XTThreadPtr self, XTOpenTablePtr ot)
1399
XTTableHPtr tab = ot->ot_table;
1401
XTTabCachePagePtr page;
1402
XTTabCacheSegPtr seg;
1406
while (rec_id<tab->tab_row_eof_id) {
1407
if (!tab->tab_rows.tc_fetch(ot->ot_row_file, rec_id, &seg, &page, &poffset, TRUE, self))
1409
TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1410
rec_id += tab->tab_rows.tci_rows_per_page;
1414
while (rec_id<tab->tab_rec_eof_id) {
1415
if (!tab->tab_recs.tc_fetch(ot->ot_rec_file, rec_id, &seg, &page, &poffset, TRUE, self))
1417
TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
1418
rec_id += tab->tab_recs.tci_rows_per_page;
1422
#ifdef TRACE_DISTRIBUTION
1423
/* ----------------------------------------------------------------------
1427
//#define TABC_TRACE_FILE "stock"
1428
//#define TABC_TRACE_FILE "customer"
1429
#define TABC_TRACE_FILE "upd_ind_PBXT"
1430
//#define TABC_TRACE_FILE "order_line"
1432
#define TABC_DISPLAY_WIDTH 180
1434
//#define TRACK_ROWS
1437
#define TABC_CORRECT_REC_SIZE(s) ((s) == 4)
1439
#define TABC_CORRECT_REC_SIZE(s) ((s) != 4)
1442
static XTSpinXSLockRec tabc_dist_lock;
1443
static u_int tabc_dist_file_id;
1444
static xtWord8 tabc_output_time;
1445
static int tabc_recs_per_block;
1446
static int tabc_change_map_eof;
1447
static int tabc_change_map_init_eof;
1448
static int tabc_change_map_size;
1449
static int *tabc_change_map;
1450
static char *tabc_change_map_string;
1452
static void tabc_init_dist(XTTabCachePtr cac, XTFilePtr file)
1454
if (!strstr(xt_last_name_of_path(file->fil_path), TABC_TRACE_FILE) || !TABC_CORRECT_REC_SIZE(cac->tci_rec_size))
1457
xt_spinxslock_init_with_autoname(NULL, &tabc_dist_lock);
1458
tabc_dist_file_id = file->fil_id;
1459
tabc_output_time = xt_trace_clock();
1461
tabc_recs_per_block = cac->tci_table->tab_row_eof_id / TABC_DISPLAY_WIDTH;
1463
tabc_recs_per_block = cac->tci_table->tab_row_eof_id / TABC_DISPLAY_WIDTH;
1465
tabc_change_map_init_eof = TABC_DISPLAY_WIDTH;
1466
tabc_change_map_eof = tabc_change_map_init_eof;
1467
tabc_change_map_size = tabc_change_map_eof * 2;
1468
tabc_change_map = (int *) malloc(tabc_change_map_size * sizeof(int));
1469
memset(tabc_change_map, 0, tabc_change_map_size * sizeof(int));
1470
tabc_change_map_string = (char *) malloc(tabc_change_map_size + 1);
1471
printf("FILE: %s\n", file->fil_path);
1472
printf("rec size: %d\n", (int) cac->tci_rec_size);
1473
printf("block size: %d\n", tabc_recs_per_block);
1474
printf("EOF: %d\n", tabc_change_map_eof);
1478
static void tabc_dist_output(xtWord8 now)
1484
time = (int) (now - tabc_output_time);
1485
tabc_output_time = now;
1486
xt_spinxslock_xlock(&tabc_dist_lock, FALSE, 0);
1487
for (i=0; i<tabc_change_map_eof; i++) {
1488
v = tabc_change_map[i];
1489
tabc_change_map[i] = 0;
1491
tabc_change_map_string[i] = ' ';
1493
tabc_change_map_string[i] = '_';
1495
tabc_change_map_string[i] = '.';
1497
tabc_change_map_string[i] = '-';
1499
tabc_change_map_string[i] = '~';
1501
tabc_change_map_string[i] = '`';
1503
tabc_change_map_string[i] = '\'';
1505
tabc_change_map_string[i] = '*';
1507
tabc_change_map_string[i] = 0;
1508
printf("%5d %s\n", time / 1000, tabc_change_map_string);
1509
xt_spinxslock_unlock(&tabc_dist_lock, TRUE);
1512
static void tabc_dist_change(XTOpenFilePtr file, xtRefID ref_id)
1516
if (file->fr_file->fil_id != tabc_dist_file_id)
1519
now = xt_trace_clock();
1520
if (now - tabc_output_time >= 1000000)
1521
tabc_dist_output(now);
1523
ref_id = ref_id / tabc_recs_per_block;
1524
if (ref_id < tabc_change_map_size)
1525
tabc_change_map[ref_id]++;
1526
if (ref_id+1 > tabc_change_map_eof)
1527
tabc_change_map_eof = ref_id+1;