1
/* Copyright (c) 2005 PrimeBase Technologies GmbH
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* 2005-01-24 Paul McCullagh
24
#include "xt_config.h"
34
#include "mysql_priv.h"
39
#include "filesys_xt.h"
40
#include "database_xt.h"
41
#include "memory_xt.h"
42
#include "strutil_xt.h"
43
#include "sortedlist_xt.h"
50
static void dl_wake_co_thread(XTDatabaseHPtr db);
53
* --------------------------------------------------------------------------------
57
xtBool XTDataSeqRead::sl_seq_init(struct XTDatabase *db, size_t buffer_size)
60
sl_buffer_size = buffer_size;
65
sl_buf_log_offset = 0;
67
sl_buffer = (xtWord1 *) xt_malloc_ns(buffer_size);
70
sl_rec_log_offset = 0;
74
return sl_buffer != NULL;
77
void XTDataSeqRead::sl_seq_exit()
80
xt_close_file_ns(sl_log_file);
84
xt_free_ns(sl_buffer);
89
XTOpenFilePtr XTDataSeqRead::sl_seq_open_file()
94
void XTDataSeqRead::sl_seq_pos(xtLogID *log_id, xtLogOffset *log_offset)
96
*log_id = sl_rec_log_id;
97
*log_offset = sl_rec_log_offset;
100
xtBool XTDataSeqRead::sl_seq_start(xtLogID log_id, xtLogOffset log_offset, xtBool missing_ok)
102
if (sl_rec_log_id != log_id) {
104
xt_close_file_ns(sl_log_file);
108
sl_rec_log_id = log_id;
109
sl_buf_log_offset = sl_rec_log_offset;
112
if (!sl_db->db_datalogs.dlc_open_log(&sl_log_file, log_id, missing_ok ? XT_FS_MISSING_OK : XT_FS_DEFAULT))
115
sl_log_eof = xt_seek_eof_file(NULL, sl_log_file);
117
sl_rec_log_offset = log_offset;
122
xtBool XTDataSeqRead::sl_rnd_read(xtLogOffset log_offset, size_t size, xtWord1 *buffer, size_t *data_read, struct XTThread *thread)
128
return xt_pread_file(sl_log_file, log_offset, size, 0, buffer, data_read, &thread->st_statistics.st_data, thread);
132
* Unlike the transaction log sequential reader, this function only returns
133
* the header of a record.
136
* This function now skips gaps. This should not be required, because in normal
137
* operation, no gaps should be created.
139
* However, if his happens there is a danger that a valid record after the
142
* So, if we find an invalid record, we scan through the log to find the next
143
* valid record. Note, that there is still a danger that will will find
144
* data that looks like a valid record, but is not.
146
* In this case, this "pseudo record" may cause the function to actually skip
149
* Note, any such malfunction will eventually cause the record to be lost forever
150
* after the garbage collector has run.
152
xtBool XTDataSeqRead::sl_seq_next(XTXactLogBufferDPtr *ret_entry, struct XTThread *thread)
154
XTXactLogBufferDPtr record;
159
xtBool reread_from_buffer;
161
xtLogOffset gap_start = 0;
163
/* Go to the next record (xseq_record_len must be initialized
164
* to 0 for this to work.
167
sl_rec_log_offset += sl_record_len;
170
if (sl_rec_log_offset < sl_buf_log_offset ||
171
sl_rec_log_offset >= sl_buf_log_offset + (xtLogOffset) sl_buffer_len) {
172
/* The current position is nowhere near the buffer, read data into the
175
tfer = sl_buffer_size;
176
if (!sl_rnd_read(sl_rec_log_offset, tfer, sl_buffer, &tfer, thread))
178
sl_buf_log_offset = sl_rec_log_offset;
179
sl_buffer_len = tfer;
181
/* Should we go to the next log? */
186
/* The start of the record is in the buffer: */
188
rec_offset = (size_t) (sl_rec_log_offset - sl_buf_log_offset);
189
max_rec_len = sl_buffer_len - rec_offset;
190
reread_from_buffer = FALSE;
193
/* Check the type of record: */
194
record = (XTXactLogBufferDPtr) (sl_buffer + rec_offset);
195
switch (record->xl.xl_status_1) {
196
case XT_LOG_ENT_HEADER:
197
if (sl_rec_log_offset != 0)
198
goto scan_to_next_record;
199
if (offsetof(XTXactLogHeaderDRec, xh_size_4) + 4 > max_rec_len) {
200
reread_from_buffer = TRUE;
203
len = XT_GET_DISK_4(record->xh.xh_size_4);
204
if (len > max_rec_len) {
205
reread_from_buffer = TRUE;
209
if (record->xh.xh_checksum_1 != XT_CHECKSUM_1(sl_rec_log_id))
211
if (XT_LOG_HEAD_MAGIC(record, len) != XT_LOG_FILE_MAGIC)
213
if (len > offsetof(XTXactLogHeaderDRec, xh_log_id_4) + 4) {
214
if (XT_GET_DISK_4(record->xh.xh_log_id_4) != sl_rec_log_id)
218
case XT_LOG_ENT_EXT_REC_OK:
219
case XT_LOG_ENT_EXT_REC_DEL:
221
xt_logf(XT_NS_CONTEXT, XT_LOG_WARNING, "Gap in data log %lu, start: %llu, size: %llu\n", (u_long) sl_rec_log_id, (u_llong) gap_start, (u_llong) (sl_rec_log_offset - gap_start));
224
len = offsetof(XTactExtRecEntryDRec, er_data);
225
if (len > max_rec_len) {
226
reread_from_buffer = TRUE;
229
size = XT_GET_DISK_4(record->er.er_data_size_4);
230
/* Verify the record as good as we can! */
232
goto scan_to_next_record;
233
if (sl_rec_log_offset + (xtLogOffset) offsetof(XTactExtRecEntryDRec, er_data) + size > sl_log_eof)
234
goto scan_to_next_record;
235
if (!XT_GET_DISK_4(record->er.er_tab_id_4))
236
goto scan_to_next_record;
237
if (!XT_GET_DISK_4(record->er.er_rec_id_4))
238
goto scan_to_next_record;
241
/* Note, we no longer assume EOF.
242
* Instead, we skip to the next value record. */
243
goto scan_to_next_record;
246
if (len <= max_rec_len) {
247
/* The record is completely in the buffer: */
248
sl_record_len = len+size;
254
/* The record is partially in the buffer. */
255
memmove(sl_buffer, sl_buffer + rec_offset, max_rec_len);
256
sl_buf_log_offset += rec_offset;
257
sl_buffer_len = max_rec_len;
259
/* Read the rest, as far as possible: */
260
tfer = sl_buffer_size - max_rec_len;
261
if (!sl_rnd_read(sl_buf_log_offset + max_rec_len, tfer, sl_buffer + max_rec_len, &tfer, thread))
263
sl_buffer_len += tfer;
265
if (sl_buffer_len < len)
266
/* A partial record is in the log, must be the end of the log: */
269
if (reread_from_buffer)
270
goto read_from_buffer;
272
/* The record is not completely in the buffer: */
274
*ret_entry = (XTXactLogBufferDPtr) sl_buffer;
279
gap_start = sl_rec_log_offset;
280
xt_logf(XT_NS_CONTEXT, XT_LOG_WARNING, "Gap found in data log %lu, starting at offset %llu\n", (u_long) sl_rec_log_id, (u_llong) gap_start);
288
xt_logf(XT_NS_CONTEXT, XT_LOG_WARNING, "Gap in data log %lu, start: %llu, size: %llu\n", (u_long) sl_rec_log_id, (u_llong) gap_start, (u_llong) (sl_rec_log_offset - gap_start));
295
void XTDataSeqRead::sl_seq_skip(size_t size)
297
sl_record_len += size;
300
void XTDataSeqRead::sl_seq_skip_to(off_t log_offset)
302
if (log_offset >= sl_rec_log_offset)
303
sl_record_len = (size_t) (log_offset - sl_rec_log_offset);
307
* --------------------------------------------------------------------------------
311
static xtBool dl_create_log_header(XTDataLogFilePtr data_log, XTOpenFilePtr of, XTThreadPtr thread)
313
XTXactLogHeaderDRec header;
315
/* The header was not completely written, so write a new one: */
316
memset(&header, 0, sizeof(XTXactLogHeaderDRec));
317
header.xh_status_1 = XT_LOG_ENT_HEADER;
318
header.xh_checksum_1 = XT_CHECKSUM_1(data_log->dlf_log_id);
319
XT_SET_DISK_4(header.xh_size_4, sizeof(XTXactLogHeaderDRec));
320
XT_SET_DISK_8(header.xh_free_space_8, 0);
321
XT_SET_DISK_8(header.xh_file_len_8, sizeof(XTXactLogHeaderDRec));
322
XT_SET_DISK_4(header.xh_log_id_4, data_log->dlf_log_id);
323
XT_SET_DISK_2(header.xh_version_2, XT_LOG_VERSION_NO);
324
XT_SET_DISK_4(header.xh_magic_4, XT_LOG_FILE_MAGIC);
325
if (!xt_pwrite_file(of, 0, sizeof(XTXactLogHeaderDRec), &header, &thread->st_statistics.st_data, thread))
327
if (!xt_flush_file(of, &thread->st_statistics.st_data, thread))
332
static xtBool dl_write_garbage_level(XTDataLogFilePtr data_log, XTOpenFilePtr of, xtBool flush, XTThreadPtr thread)
334
XTXactLogHeaderDRec header;
336
/* The header was not completely written, so write a new one: */
337
XT_SET_DISK_8(header.xh_free_space_8, data_log->dlf_garbage_count);
338
if (!xt_pwrite_file(of, offsetof(XTXactLogHeaderDRec, xh_free_space_8), 8, (xtWord1 *) &header.xh_free_space_8, &thread->st_statistics.st_data, thread))
340
if (flush && !xt_flush_file(of, &thread->st_statistics.st_data, thread))
347
* Extra garbage is the amount of space skipped during recovery of the data
348
* log file. We assume this space has not be counted as garbage,
349
* and add it to the garbage count.
351
* This may mean that our estimate of garbaged is higher than it should
352
* be, but that is better than the other way around.
354
* The fact is, there should not be any gaps in the data log files, so
355
* this is actually an exeption which should not occur.
357
static xtBool dl_write_log_header(XTDataLogFilePtr data_log, XTOpenFilePtr of, xtLogOffset extra_garbage, XTThreadPtr thread)
359
XTXactLogHeaderDRec header;
361
XT_SET_DISK_8(header.xh_file_len_8, data_log->dlf_log_eof);
364
data_log->dlf_garbage_count += extra_garbage;
365
if (data_log->dlf_garbage_count > data_log->dlf_log_eof)
366
data_log->dlf_garbage_count = data_log->dlf_log_eof;
367
XT_SET_DISK_8(header.xh_free_space_8, data_log->dlf_garbage_count);
368
if (!xt_pwrite_file(of, offsetof(XTXactLogHeaderDRec, xh_free_space_8), 16, (xtWord1 *) &header.xh_free_space_8, &thread->st_statistics.st_data, thread))
372
if (!xt_pwrite_file(of, offsetof(XTXactLogHeaderDRec, xh_file_len_8), 8, (xtWord1 *) &header.xh_file_len_8, &thread->st_statistics.st_data, thread))
375
if (!xt_flush_file(of, &thread->st_statistics.st_data, thread))
380
static void dl_free_seq_read(XTThreadPtr self __attribute__((unused)), XTDataSeqReadPtr seq_read)
382
seq_read->sl_seq_exit();
385
static void dl_recover_log(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogFilePtr data_log)
387
XTDataSeqReadRec seq_read;
388
XTXactLogBufferDPtr record;
390
if (!seq_read.sl_seq_init(db, xt_db_log_buffer_size))
392
pushr_(dl_free_seq_read, &seq_read);
394
seq_read.sl_seq_start(data_log->dlf_log_id, 0, FALSE);
397
if (!seq_read.sl_seq_next(&record, self))
401
switch (record->xh.xh_status_1) {
402
case XT_LOG_ENT_HEADER:
403
data_log->dlf_garbage_count = XT_GET_DISK_8(record->xh.xh_free_space_8);
404
data_log->dlf_start_offset = XT_GET_DISK_8(record->xh.xh_comp_pos_8);
405
seq_read.sl_seq_skip_to((off_t) XT_GET_DISK_8(record->xh.xh_file_len_8));
410
ASSERT_NS(seq_read.sl_log_eof == seq_read.sl_rec_log_offset);
411
data_log->dlf_log_eof = seq_read.sl_rec_log_offset;
413
if (data_log->dlf_log_eof < (off_t) sizeof(XTXactLogHeaderDRec)) {
414
data_log->dlf_log_eof = sizeof(XTXactLogHeaderDRec);
415
if (!dl_create_log_header(data_log, seq_read.sl_log_file, self))
419
if (!dl_write_log_header(data_log, seq_read.sl_log_file, seq_read.sl_extra_garbage, self))
423
freer_(); // dl_free_seq_read(&seq_read)
427
* --------------------------------------------------------------------------------
428
* D A T A L O G C AC H E
431
void XTDataLogCache::dls_remove_log(XTDataLogFilePtr data_log)
433
xtLogID log_id = data_log->dlf_log_id;
435
switch (data_log->dlf_state) {
436
case XT_DL_HAS_SPACE:
437
xt_sl_delete(NULL, dlc_has_space, &log_id);
439
case XT_DL_TO_COMPACT:
440
xt_sl_delete(NULL, dlc_to_compact, &log_id);
442
case XT_DL_TO_DELETE:
443
xt_sl_delete(NULL, dlc_to_delete, &log_id);
446
xt_sl_delete(NULL, dlc_deleted, &log_id);
451
int XTDataLogCache::dls_get_log_state(XTDataLogFilePtr data_log)
453
if (data_log->dlf_to_much_garbage())
454
return XT_DL_TO_COMPACT;
455
if (data_log->dlf_space_avaliable() > 0)
456
return XT_DL_HAS_SPACE;
457
return XT_DL_READ_ONLY;
460
xtBool XTDataLogCache::dls_set_log_state(XTDataLogFilePtr data_log, int state)
462
xtLogID log_id = data_log->dlf_log_id;
464
xt_lock_mutex_ns(&dlc_lock);
465
if (state == XT_DL_MAY_COMPACT) {
466
if (data_log->dlf_state != XT_DL_UNKNOWN &&
467
data_log->dlf_state != XT_DL_HAS_SPACE &&
468
data_log->dlf_state != XT_DL_READ_ONLY)
470
state = XT_DL_TO_COMPACT;
472
if (state == XT_DL_UNKNOWN)
473
state = dls_get_log_state(data_log);
475
case XT_DL_HAS_SPACE:
476
if (data_log->dlf_state != XT_DL_HAS_SPACE) {
477
dls_remove_log(data_log);
478
if (!xt_sl_insert(NULL, dlc_has_space, &log_id, &log_id))
482
case XT_DL_TO_COMPACT:
483
#ifdef DEBUG_LOG_DELETE
484
printf("-- set to compact: %d\n", (int) log_id);
486
if (data_log->dlf_state != XT_DL_TO_COMPACT) {
487
dls_remove_log(data_log);
488
if (!xt_sl_insert(NULL, dlc_to_compact, &log_id, &log_id))
491
dl_wake_co_thread(dlc_db);
493
case XT_DL_COMPACTED:
494
#ifdef DEBUG_LOG_DELETE
495
printf("-- set compacted: %d\n", (int) log_id);
497
if (data_log->dlf_state != state)
498
dls_remove_log(data_log);
500
case XT_DL_TO_DELETE:
501
#ifdef DEBUG_LOG_DELETE
502
printf("-- set to delete log: %d\n", (int) log_id);
504
if (data_log->dlf_state != XT_DL_TO_DELETE) {
505
dls_remove_log(data_log);
506
if (!xt_sl_insert(NULL, dlc_to_delete, &log_id, &log_id))
511
#ifdef DEBUG_LOG_DELETE
512
printf("-- set DELETED log: %d\n", (int) log_id);
514
if (data_log->dlf_state != XT_DL_DELETED) {
515
dls_remove_log(data_log);
516
if (!xt_sl_insert(NULL, dlc_deleted, &log_id, &log_id))
521
if (data_log->dlf_state != state)
522
dls_remove_log(data_log);
525
data_log->dlf_state = state;
528
xt_unlock_mutex_ns(&dlc_lock);
532
xt_unlock_mutex_ns(&dlc_lock);
536
static int dl_cmp_log_id(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
538
xtLogID log_id_a = *((xtLogID *) a);
539
xtLogID log_id_b = *((xtLogID *) b);
541
if (log_id_a == log_id_b)
543
if (log_id_a < log_id_b)
548
void XTDataLogCache::dlc_init(XTThreadPtr self, XTDatabaseHPtr db)
551
char log_dir[PATH_MAX];
554
XTDataLogFilePtr data_log= NULL;
556
memset(this, 0, sizeof(XTDataLogCacheRec));
559
xt_init_mutex_with_autoname(self, &dlc_lock);
560
xt_init_cond(self, &dlc_cond);
561
for (u_int i=0; i<XT_DL_NO_OF_SEGMENTS; i++) {
562
xt_init_mutex_with_autoname(self, &dlc_segment[i].dls_lock);
563
xt_init_cond(self, &dlc_segment[i].dls_cond);
565
dlc_has_space = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
566
dlc_to_compact = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
567
dlc_to_delete = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
568
dlc_deleted = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
569
xt_init_mutex_with_autoname(self, &dlc_mru_lock);
570
xt_init_mutex_with_autoname(self, &dlc_head_lock);
572
xt_strcpy(PATH_MAX, log_dir, dlc_db->db_main_path);
573
xt_add_data_dir(PATH_MAX, log_dir);
574
if (xt_fs_exists(log_dir)) {
575
pushsr_(od, xt_dir_close, xt_dir_open(self, log_dir, NULL));
576
while (xt_dir_next(self, od)) {
577
file = xt_dir_name(self, od);
578
if (xt_ends_with(file, ".xt")) {
579
if ((log_id = (xtLogID) xt_file_name_to_id(file))) {
580
if (!dlc_get_data_log(&data_log, log_id, TRUE, NULL))
582
dl_recover_log(self, db, data_log);
583
if (!dls_set_log_state(data_log, XT_DL_UNKNOWN))
598
void XTDataLogCache::dlc_exit(XTThreadPtr self)
600
XTDataLogFilePtr data_log, tmp_data_log;
601
XTOpenLogFilePtr open_log, tmp_open_log;
604
xt_free_sortedlist(self, dlc_has_space);
605
dlc_has_space = NULL;
607
if (dlc_to_compact) {
608
xt_free_sortedlist(self, dlc_to_compact);
609
dlc_to_compact = NULL;
612
xt_free_sortedlist(self, dlc_to_delete);
613
dlc_to_delete = NULL;
616
xt_free_sortedlist(self, dlc_deleted);
619
for (u_int i=0; i<XT_DL_NO_OF_SEGMENTS; i++) {
620
for (u_int j=0; j<XT_DL_SEG_HASH_TABLE_SIZE; j++) {
621
data_log = dlc_segment[i].dls_hash_table[j];
622
dlc_segment[i].dls_hash_table[j] = NULL;
624
if (data_log->dlf_log_file) {
625
xt_close_file_ns(data_log->dlf_log_file);
626
data_log->dlf_log_file = NULL;
629
open_log = data_log->dlf_free_list;
631
if (open_log->odl_log_file)
632
xt_close_file(self, open_log->odl_log_file);
633
tmp_open_log = open_log;
634
open_log = open_log->odl_next_free;
635
xt_free(self, tmp_open_log);
637
tmp_data_log = data_log;
638
data_log = data_log->dlf_next_hash;
640
xt_free(self, tmp_data_log);
643
xt_free_mutex(&dlc_segment[i].dls_lock);
644
xt_free_cond(&dlc_segment[i].dls_cond);
646
xt_free_mutex(&dlc_head_lock);
647
xt_free_mutex(&dlc_mru_lock);
648
xt_free_mutex(&dlc_lock);
649
xt_free_cond(&dlc_cond);
652
void XTDataLogCache::dlc_name(size_t size, char *path, xtLogID log_id)
656
sprintf(name, "dlog-%lu.xt", (u_long) log_id);
657
xt_strcpy(size, path, dlc_db->db_main_path);
658
xt_add_data_dir(size, path);
659
xt_add_dir_char(size, path);
660
xt_strcat(size, path, name);
663
xtBool XTDataLogCache::dlc_open_log(XTOpenFilePtr *fh, xtLogID log_id, int mode)
665
char log_path[PATH_MAX];
667
dlc_name(PATH_MAX, log_path, log_id);
668
return xt_open_file_ns(fh, log_path, XT_FT_STANDARD, mode, 16*1024*1024);
671
xtBool XTDataLogCache::dlc_unlock_log(XTDataLogFilePtr data_log)
673
if (data_log->dlf_log_file) {
674
xt_close_file_ns(data_log->dlf_log_file);
675
data_log->dlf_log_file = NULL;
678
return dls_set_log_state(data_log, XT_DL_UNKNOWN);
681
XTDataLogFilePtr XTDataLogCache::dlc_get_log_for_writing(off_t space_required, struct XTThread *thread)
683
xtLogID log_id, *log_id_ptr = NULL;
686
XTDataLogFilePtr data_log = NULL;
688
xt_lock_mutex_ns(&dlc_lock);
690
/* Look for an existing log with enough space: */
691
size = xt_sl_get_size(dlc_has_space);
692
for (idx=0; idx<size; idx++) {
693
log_id_ptr = (xtLogID *) xt_sl_item_at(dlc_has_space, idx);
694
if (!dlc_get_data_log(&data_log, *log_id_ptr, FALSE, NULL))
697
if (data_log->dlf_space_avaliable() >= space_required)
703
xt_sl_delete_item_at(NULL, dlc_has_space, idx);
711
if (!dlc_open_log(&data_log->dlf_log_file, *log_id_ptr, XT_FS_DEFAULT))
713
xt_sl_delete_item_at(NULL, dlc_has_space, idx);
716
/* Create a new log: */
717
log_id = dlc_next_log_id;
718
for (u_int i=0; i<XT_DL_MAX_LOG_ID; i++) {
720
if (log_id > XT_DL_MAX_LOG_ID)
722
if (!dlc_get_data_log(&data_log, log_id, FALSE, NULL))
727
dlc_next_log_id = log_id;
729
xt_register_ulxterr(XT_REG_CONTEXT, XT_ERR_LOG_MAX_EXCEEDED, (u_long) XT_DL_MAX_LOG_ID);
732
if (!dlc_get_data_log(&data_log, log_id, TRUE, NULL))
734
if (!dlc_open_log(&data_log->dlf_log_file, log_id, XT_FS_CREATE | XT_FS_MAKE_PATH))
736
data_log->dlf_log_eof = sizeof(XTXactLogHeaderDRec);
737
if (!dl_create_log_header(data_log, data_log->dlf_log_file, thread)) {
738
xt_close_file_ns(data_log->dlf_log_file);
741
/* By setting this late we ensure that the error
744
dlc_next_log_id = log_id;
746
data_log->dlf_state = XT_DL_EXCLUSIVE;
748
xt_unlock_mutex_ns(&dlc_lock);
752
xt_unlock_mutex_ns(&dlc_lock);
756
xtBool XTDataLogCache::dlc_get_data_log(XTDataLogFilePtr *lf, xtLogID log_id, xtBool create, XTDataLogSegPtr *ret_seg)
758
register XTDataLogSegPtr seg;
759
register u_int hash_idx;
760
register XTDataLogFilePtr data_log;
762
/* Which segment, and hash index: */
763
seg = &dlc_segment[log_id & XT_DL_SEGMENT_MASK];
764
hash_idx = (log_id >> XT_DL_SEGMENT_SHIFTS) % XT_DL_SEG_HASH_TABLE_SIZE;
766
/* Lock the segment: */
767
xt_lock_mutex_ns(&seg->dls_lock);
769
/* Find the log file on the hash list: */
770
data_log = seg->dls_hash_table[hash_idx];
772
if (data_log->dlf_log_id == log_id)
774
data_log = data_log->dlf_next_hash;
777
if (!data_log && create) {
778
/* Create a new log file structure: */
779
if (!(data_log = (XTDataLogFilePtr) xt_calloc_ns(sizeof(XTDataLogFileRec))))
781
data_log->dlf_log_id = log_id;
782
data_log->dlf_next_hash = seg->dls_hash_table[hash_idx];
783
seg->dls_hash_table[hash_idx] = data_log;
787
/* This gives the caller the lock: */
793
xt_unlock_mutex_ns(&seg->dls_lock);
798
xt_unlock_mutex_ns(&seg->dls_lock);
803
* If just_close is FALSE, then a log is being deleted.
804
* This means that that the log may still be in exclusive use by
805
* some thread. So we just close the log!
807
xtBool XTDataLogCache::dlc_remove_data_log(xtLogID log_id, xtBool just_close)
809
register XTDataLogSegPtr seg;
810
register u_int hash_idx;
811
register XTDataLogFilePtr data_log;
812
XTOpenLogFilePtr open_log, tmp_open_log;
814
/* Which segment, and hash index: */
815
seg = &dlc_segment[log_id & XT_DL_SEGMENT_MASK];
816
hash_idx = (log_id >> XT_DL_SEGMENT_SHIFTS) % XT_DL_SEG_HASH_TABLE_SIZE;
818
/* Lock the segment: */
820
xt_lock_mutex_ns(&seg->dls_lock);
822
/* Find the log file on the hash list: */
823
data_log = seg->dls_hash_table[hash_idx];
825
if (data_log->dlf_log_id == log_id)
827
data_log = data_log->dlf_next_hash;
831
xt_lock_mutex_ns(&dlc_mru_lock);
833
open_log = data_log->dlf_free_list;
835
if (open_log->odl_log_file)
836
xt_close_file_ns(open_log->odl_log_file);
838
/* Remove from MRU list: */
839
if (dlc_lru_open_log == open_log) {
840
dlc_lru_open_log = open_log->odl_mr_used;
841
ASSERT_NS(!open_log->odl_lr_used);
843
else if (open_log->odl_lr_used)
844
open_log->odl_lr_used->odl_mr_used = open_log->odl_mr_used;
845
if (dlc_mru_open_log == open_log) {
846
dlc_mru_open_log = open_log->odl_lr_used;
847
ASSERT_NS(!open_log->odl_mr_used);
849
else if (open_log->odl_mr_used)
850
open_log->odl_mr_used->odl_lr_used = open_log->odl_lr_used;
852
data_log->dlf_open_count--;
853
tmp_open_log = open_log;
854
open_log = open_log->odl_next_free;
855
xt_free_ns(tmp_open_log);
857
data_log->dlf_free_list = NULL;
859
xt_unlock_mutex_ns(&dlc_mru_lock);
861
if (data_log->dlf_open_count) {
862
if (!xt_timed_wait_cond_ns(&seg->dls_cond, &seg->dls_lock, 2000))
864
xt_unlock_mutex_ns(&seg->dls_lock);
868
/* Close the exclusive file if required: */
869
if (data_log->dlf_log_file) {
870
xt_close_file_ns(data_log->dlf_log_file);
871
data_log->dlf_log_file = NULL;
875
/* Remove the log from the hash list: */
876
XTDataLogFilePtr ptr, pptr = NULL;
878
ptr = seg->dls_hash_table[hash_idx];
883
ptr = ptr->dlf_next_hash;
886
if (ptr == data_log) {
888
pptr->dlf_next_hash = ptr->dlf_next_hash;
890
seg->dls_hash_table[hash_idx] = ptr->dlf_next_hash;
893
xt_free_ns(data_log);
897
xt_unlock_mutex_ns(&seg->dls_lock);
901
xt_unlock_mutex_ns(&seg->dls_lock);
905
xtBool XTDataLogCache::dlc_get_open_log(XTOpenLogFilePtr *ol, xtLogID log_id)
907
register XTDataLogSegPtr seg;
908
register u_int hash_idx;
909
register XTDataLogFilePtr data_log;
910
register XTOpenLogFilePtr open_log;
913
/* Which segment, and hash index: */
914
seg = &dlc_segment[log_id & XT_DL_SEGMENT_MASK];
915
hash_idx = (log_id >> XT_DL_SEGMENT_SHIFTS) % XT_DL_SEG_HASH_TABLE_SIZE;
917
/* Lock the segment: */
918
xt_lock_mutex_ns(&seg->dls_lock);
920
/* Find the log file on the hash list: */
921
data_log = seg->dls_hash_table[hash_idx];
923
if (data_log->dlf_log_id == log_id)
925
data_log = data_log->dlf_next_hash;
929
/* Create a new log file structure: */
930
dlc_name(PATH_MAX, path, log_id);
931
if (!xt_fs_exists(path)) {
932
xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_DATA_LOG_NOT_FOUND, path);
935
if (!(data_log = (XTDataLogFilePtr) xt_calloc_ns(sizeof(XTDataLogFileRec))))
937
data_log->dlf_log_id = log_id;
938
data_log->dlf_next_hash = seg->dls_hash_table[hash_idx];
939
seg->dls_hash_table[hash_idx] = data_log;
942
if ((open_log = data_log->dlf_free_list)) {
943
/* Remove from the free list: */
944
if ((data_log->dlf_free_list = open_log->odl_next_free))
945
data_log->dlf_free_list->odl_prev_free = NULL;
947
/* This file has been most recently used: */
948
if (XT_TIME_DIFF(open_log->odl_ru_time, dlc_ru_now) > (XT_DL_LOG_POOL_SIZE >> 1)) {
949
/* Move to the front of the MRU list: */
950
xt_lock_mutex_ns(&dlc_mru_lock);
952
open_log->odl_ru_time = ++dlc_ru_now;
953
if (dlc_mru_open_log != open_log) {
954
/* Remove from the MRU list: */
955
if (dlc_lru_open_log == open_log) {
956
dlc_lru_open_log = open_log->odl_mr_used;
957
ASSERT_NS(!open_log->odl_lr_used);
959
else if (open_log->odl_lr_used)
960
open_log->odl_lr_used->odl_mr_used = open_log->odl_mr_used;
961
if (open_log->odl_mr_used)
962
open_log->odl_mr_used->odl_lr_used = open_log->odl_lr_used;
964
/* Make the file the most recently used: */
965
if ((open_log->odl_lr_used = dlc_mru_open_log))
966
dlc_mru_open_log->odl_mr_used = open_log;
967
open_log->odl_mr_used = NULL;
968
dlc_mru_open_log = open_log;
969
if (!dlc_lru_open_log)
970
dlc_lru_open_log = open_log;
972
xt_unlock_mutex_ns(&dlc_mru_lock);
976
/* Create a new open file: */
977
if (!(open_log = (XTOpenLogFilePtr) xt_calloc_ns(sizeof(XTOpenLogFileRec))))
979
dlc_name(PATH_MAX, path, log_id);
980
if (!xt_open_file_ns(&open_log->odl_log_file, path, XT_FT_STANDARD, XT_FS_DEFAULT, 16*1024*1204)) {
981
xt_free_ns(open_log);
984
open_log->olf_log_id = log_id;
985
open_log->odl_data_log = data_log;
986
data_log->dlf_open_count++;
988
/* Make the new open file the most recently used: */
989
xt_lock_mutex_ns(&dlc_mru_lock);
990
open_log->odl_ru_time = ++dlc_ru_now;
991
if ((open_log->odl_lr_used = dlc_mru_open_log))
992
dlc_mru_open_log->odl_mr_used = open_log;
993
open_log->odl_mr_used = NULL;
994
dlc_mru_open_log = open_log;
995
if (!dlc_lru_open_log)
996
dlc_lru_open_log = open_log;
998
xt_unlock_mutex_ns(&dlc_mru_lock);
1001
open_log->odl_in_use = TRUE;
1002
xt_unlock_mutex_ns(&seg->dls_lock);
1005
if (dlc_open_count > XT_DL_LOG_POOL_SIZE) {
1006
u_int target = XT_DL_LOG_POOL_SIZE / 4 * 3;
1007
xtLogID free_log_id;
1009
/* Remove some open files: */
1010
while (dlc_open_count > target) {
1011
XTOpenLogFilePtr to_free = dlc_lru_open_log;
1013
if (!to_free || to_free->odl_in_use)
1016
/* Dirty read the file ID: */
1017
free_log_id = to_free->olf_log_id;
1019
seg = &dlc_segment[free_log_id & XT_DL_SEGMENT_MASK];
1021
/* Lock the segment: */
1022
xt_lock_mutex_ns(&seg->dls_lock);
1024
/* Lock the MRU list: */
1025
xt_lock_mutex_ns(&dlc_mru_lock);
1027
/* Check if we have the same open file: */
1028
if (dlc_lru_open_log == to_free && !to_free->odl_in_use) {
1029
data_log = to_free->odl_data_log;
1031
/* Remove from the MRU list: */
1032
dlc_lru_open_log = to_free->odl_mr_used;
1033
ASSERT_NS(!to_free->odl_lr_used);
1035
if (dlc_mru_open_log == to_free) {
1036
dlc_mru_open_log = to_free->odl_lr_used;
1037
ASSERT_NS(!to_free->odl_mr_used);
1039
else if (to_free->odl_mr_used)
1040
to_free->odl_mr_used->odl_lr_used = to_free->odl_lr_used;
1042
/* Remove from the free list of the file: */
1043
if (data_log->dlf_free_list == to_free) {
1044
data_log->dlf_free_list = to_free->odl_next_free;
1045
ASSERT_NS(!to_free->odl_prev_free);
1047
else if (to_free->odl_prev_free)
1048
to_free->odl_prev_free->odl_next_free = to_free->odl_next_free;
1049
if (to_free->odl_next_free)
1050
to_free->odl_next_free->odl_prev_free = to_free->odl_prev_free;
1051
ASSERT_NS(data_log->dlf_open_count > 0);
1052
data_log->dlf_open_count--;
1058
xt_unlock_mutex_ns(&dlc_mru_lock);
1059
xt_unlock_mutex_ns(&seg->dls_lock);
1062
xt_close_file_ns(to_free->odl_log_file);
1063
xt_free_ns(to_free);
1071
xt_unlock_mutex_ns(&seg->dls_lock);
1075
void XTDataLogCache::dlc_release_open_log(XTOpenLogFilePtr open_log)
1077
register XTDataLogSegPtr seg;
1078
register XTDataLogFilePtr data_log = open_log->odl_data_log;
1080
/* Which segment, and hash index: */
1081
seg = &dlc_segment[open_log->olf_log_id & XT_DL_SEGMENT_MASK];
1083
xt_lock_mutex_ns(&seg->dls_lock);
1084
open_log->odl_next_free = data_log->dlf_free_list;
1085
open_log->odl_prev_free = NULL;
1086
if (data_log->dlf_free_list)
1087
data_log->dlf_free_list->odl_prev_free = open_log;
1088
data_log->dlf_free_list = open_log;
1089
open_log->odl_in_use = FALSE;
1091
/* Wakeup any exclusive lockers: */
1092
if (!xt_broadcast_cond_ns(&seg->dls_cond))
1093
xt_log_and_clear_exception_ns();
1095
xt_unlock_mutex_ns(&seg->dls_lock);
1099
* --------------------------------------------------------------------------------
1100
* D A T A L O G F I L E
1103
off_t XTDataLogFile::dlf_space_avaliable()
1105
if (dlf_log_eof < xt_db_data_log_threshold)
1106
return xt_db_data_log_threshold - dlf_log_eof;
1110
xtBool XTDataLogFile::dlf_to_much_garbage()
1114
return dlf_garbage_count * 100 / dlf_log_eof >= xt_db_garbage_threshold;
1118
* --------------------------------------------------------------------------------
1119
* D A T A L O G B U F F E R
1122
void XTDataLogBuffer::dlb_init(XTDatabaseHPtr db, size_t buffer_size)
1125
ASSERT_NS(!dlb_buffer_size);
1126
ASSERT_NS(!dlb_data_log);
1127
ASSERT_NS(!dlb_log_buffer);
1129
dlb_buffer_size = buffer_size;
1132
void XTDataLogBuffer::dlb_exit(XTThreadPtr self)
1134
dlb_close_log(self);
1135
if (dlb_log_buffer) {
1136
xt_free(self, dlb_log_buffer);
1137
dlb_log_buffer = NULL;
1140
dlb_buffer_offset = 0;
1141
dlb_buffer_size = 0;
1143
dlb_flush_required = FALSE;
1145
dlb_max_write_offset = 0;
1149
xtBool XTDataLogBuffer::dlb_close_log(XTThreadPtr thread)
1152
/* Flush and commit the data in the old log: */
1153
if (!dlb_flush_log(TRUE, thread))
1156
if (!dlb_db->db_datalogs.dlc_unlock_log(dlb_data_log))
1158
dlb_data_log = NULL;
1163
/* When I use 'thread' instead of 'self', this means
1164
* that I will not throw an error.
1166
xtBool XTDataLogBuffer::dlb_get_log_offset(xtLogID *log_id, xtLogOffset *out_offset, size_t XT_UNUSED(req_size), struct XTThread *thread)
1168
/* Note, I am allowing a log to grow beyond the threshold.
1169
* The amount depends on the maximum extended record size.
1170
* If I don't some logs will never fill up, because of only having
1171
* a few more bytes available.
1173
if (!dlb_data_log || dlb_data_log->dlf_space_avaliable() == 0) {
1174
/* Release the old log: */
1175
if (!dlb_close_log(thread))
1178
if (!dlb_log_buffer) {
1179
if (!(dlb_log_buffer = (xtWord1 *) xt_malloc_ns(dlb_buffer_size)))
1183
/* I could use req_size instead of 1, but this would mean some logs
1184
* are never filled up.
1186
if (!(dlb_data_log = dlb_db->db_datalogs.dlc_get_log_for_writing(1, thread)))
1189
dlb_max_write_offset = dlb_data_log->dlf_log_eof;
1193
*log_id = dlb_data_log->dlf_log_id;
1194
*out_offset = dlb_data_log->dlf_log_eof;
1198
xtBool XTDataLogBuffer::dlb_flush_log(xtBool commit, XTThreadPtr thread)
1200
if (!dlb_data_log || !dlb_data_log->dlf_log_file)
1203
if (dlb_buffer_len) {
1204
if (!xt_pwrite_file(dlb_data_log->dlf_log_file, dlb_buffer_offset, dlb_buffer_len, dlb_log_buffer, &thread->st_statistics.st_data, thread))
1207
if (dlb_buffer_offset + dlb_buffer_len > dlb_max_write_offset)
1208
dlb_max_write_offset = dlb_buffer_offset + (xtLogOffset) dlb_buffer_len;
1211
dlb_flush_required = TRUE;
1214
if (commit && dlb_flush_required) {
1216
/* This would normally be equal, however, in the case
1217
* where some other thread flushes the compactors
1218
* data log, the eof, can be greater than the
1221
* This occurs because the flush can come between the
1222
* dlb_get_log_offset() and dlb_write_thru_log() calls.
1224
ASSERT_NS(dlb_data_log->dlf_log_eof >= dlb_max_write_offset);
1226
if (!xt_flush_file(dlb_data_log->dlf_log_file, &thread->st_statistics.st_data, thread))
1228
dlb_flush_required = FALSE;
1233
//#define INJECT_ERROR
1239
xtBool XTDataLogBuffer::dlb_write_thru_log(xtLogID XT_NDEBUG_UNUSED(log_id), xtLogOffset log_offset, size_t size, xtWord1 *data, XTThreadPtr thread)
1241
ASSERT_NS(log_id == dlb_data_log->dlf_log_id);
1244
dlb_flush_log(FALSE, thread);
1248
if (inject_when > 1000 && inject_when < 1002)
1249
return xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(dlb_data_log->dlf_log_file));
1251
if (!xt_pwrite_file(dlb_data_log->dlf_log_file, log_offset, size, data, &thread->st_statistics.st_data, thread))
1253
/* Increment of dlb_data_log->dlf_log_eof was moved here from dlb_get_log_offset()
1254
* to ensure it is done after a successful update of the log, otherwise otherwise a
1255
* gap occurs in the log which cause eof to be detected in middle of the log
1257
dlb_data_log->dlf_log_eof += size;
1259
if (log_offset + size > dlb_max_write_offset)
1260
dlb_max_write_offset = log_offset + size;
1262
dlb_flush_required = TRUE;
1266
xtBool XTDataLogBuffer::dlb_append_log(xtLogID XT_NDEBUG_UNUSED(log_id), xtLogOffset log_offset, size_t size, xtWord1 *data, XTThreadPtr thread)
1268
ASSERT_NS(log_id == dlb_data_log->dlf_log_id);
1270
if (dlb_buffer_len) {
1271
/* Should be the case, we only write by appending: */
1272
ASSERT_NS(dlb_buffer_offset + (xtLogOffset) dlb_buffer_len == log_offset);
1273
/* Check if we are appending to the existing value in the buffer: */
1274
if (dlb_buffer_offset + (xtLogOffset) dlb_buffer_len == log_offset) {
1275
/* Can we just append: */
1276
if (dlb_buffer_size >= dlb_buffer_len + size) {
1277
memcpy(dlb_log_buffer + dlb_buffer_len, data, size);
1278
dlb_buffer_len += size;
1279
dlb_data_log->dlf_log_eof += size;
1283
if (dlb_flush_log(FALSE, thread) != OK)
1287
ASSERT_NS(dlb_buffer_len == 0);
1289
if (dlb_buffer_size >= size) {
1290
dlb_buffer_offset = log_offset;
1291
dlb_buffer_len = size;
1292
memcpy(dlb_log_buffer, data, size);
1293
dlb_data_log->dlf_log_eof += size;
1297
/* Write directly: */
1298
if (!xt_pwrite_file(dlb_data_log->dlf_log_file, log_offset, size, data, &thread->st_statistics.st_data, thread))
1301
if (log_offset + size > dlb_max_write_offset)
1302
dlb_max_write_offset = log_offset + size;
1304
dlb_flush_required = TRUE;
1305
dlb_data_log->dlf_log_eof += size;
1309
xtBool XTDataLogBuffer::dlb_read_log(xtLogID log_id, xtLogOffset log_offset, size_t size, xtWord1 *data, XTThreadPtr thread)
1312
XTOpenLogFilePtr open_log;
1314
if (dlb_data_log && log_id == dlb_data_log->dlf_log_id) {
1315
/* Reading from the write log, I can do this quicker: */
1316
if (dlb_buffer_len) {
1317
/* If it is in the buffer, then it is completely in the buffer. */
1318
if (log_offset >= dlb_buffer_offset) {
1319
if (log_offset + (xtLogOffset) size <= dlb_buffer_offset + (xtLogOffset) dlb_buffer_len) {
1320
memcpy(data, dlb_log_buffer + (log_offset - dlb_buffer_offset), size);
1323
/* Should not happen, reading past EOF: */
1325
memset(data, 0, size);
1328
/* In the write log, but not in the buffer,
1329
* must be completely not in the log,
1330
* because only whole records are written to the
1333
ASSERT_NS(log_offset + (xtLogOffset) size <= dlb_buffer_offset);
1335
return xt_pread_file(dlb_data_log->dlf_log_file, log_offset, size, size, data, NULL, &thread->st_statistics.st_data, thread);
1338
/* Read from some other log: */
1339
if (!dlb_db->db_datalogs.dlc_get_open_log(&open_log, log_id))
1342
if (!xt_pread_file(open_log->odl_log_file, log_offset, size, 0, data, &red_size, &thread->st_statistics.st_data, thread)) {
1343
dlb_db->db_datalogs.dlc_release_open_log(open_log);
1347
dlb_db->db_datalogs.dlc_release_open_log(open_log);
1349
if (red_size < size)
1350
memset(data + red_size, 0, size - red_size);
1356
* We assume that the given reference may not be valid.
1357
* Only valid references actually cause a delete.
1358
* Invalid references are logged, and ignored.
1360
* Note this routine does not lock the compactor.
1361
* This can lead to the some incorrect calculation is the
1362
* amount of garbage. But nothing serious I think.
1364
xtBool XTDataLogBuffer::dlb_delete_log(xtLogID log_id, xtLogOffset log_offset, size_t size, xtTableID tab_id, xtRecordID rec_id, XTThreadPtr thread)
1366
XTactExtRecEntryDRec record;
1367
xtWord1 status = XT_LOG_ENT_EXT_REC_DEL;
1368
XTOpenLogFilePtr open_log;
1369
xtBool to_much_garbage;
1370
XTDataLogFilePtr data_log;
1372
if (!dlb_read_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data), (xtWord1 *) &record, thread))
1375
/* Already deleted: */
1376
if (record.er_status_1 == XT_LOG_ENT_EXT_REC_DEL)
1379
if (record.er_status_1 != XT_LOG_ENT_EXT_REC_OK ||
1380
size != XT_GET_DISK_4(record.er_data_size_4) ||
1381
tab_id != XT_GET_DISK_4(record.er_tab_id_4) ||
1382
rec_id != XT_GET_DISK_4(record.er_rec_id_4)) {
1383
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_EXT_RECORD);
1387
if (dlb_data_log && log_id == dlb_data_log->dlf_log_id) {
1388
/* Writing to the write log, I can do this quicker: */
1389
if (dlb_buffer_len) {
1390
/* If it is in the buffer, then it is completely in the buffer. */
1391
if (log_offset >= dlb_buffer_offset) {
1392
if (log_offset + 1 <= dlb_buffer_offset + (xtLogOffset) dlb_buffer_len) {
1393
*(dlb_log_buffer + (log_offset - dlb_buffer_offset)) = XT_LOG_ENT_EXT_REC_DEL;
1394
goto inc_garbage_count;
1396
/* Should not happen, writing past EOF: */
1400
ASSERT_NS(log_offset + (xtLogOffset) size <= dlb_buffer_offset);
1403
if (!xt_pwrite_file(dlb_data_log->dlf_log_file, log_offset, 1, &status, &thread->st_statistics.st_data, thread))
1407
xt_lock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
1408
dlb_data_log->dlf_garbage_count += offsetof(XTactExtRecEntryDRec, er_data) + size;
1409
ASSERT_NS(dlb_data_log->dlf_garbage_count < dlb_data_log->dlf_log_eof);
1410
if (!dl_write_garbage_level(dlb_data_log, dlb_data_log->dlf_log_file, FALSE, thread)) {
1411
xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
1414
dlb_flush_required = TRUE;
1415
xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
1419
/* Write to some other log, open the log: */
1420
if (!dlb_db->db_datalogs.dlc_get_open_log(&open_log, log_id))
1423
/* Write the status byte: */
1424
if (!xt_pwrite_file(open_log->odl_log_file, log_offset, 1, &status, &thread->st_statistics.st_data, thread))
1427
data_log = open_log->odl_data_log;
1429
/* Adjust the garbage level in the header. */
1430
xt_lock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
1431
data_log->dlf_garbage_count += offsetof(XTactExtRecEntryDRec, er_data) + size;
1432
ASSERT_NS(data_log->dlf_garbage_count < data_log->dlf_log_eof);
1433
if (!dl_write_garbage_level(data_log, open_log->odl_log_file, FALSE, thread)) {
1434
xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
1437
to_much_garbage = data_log->dlf_to_much_garbage();
1438
xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
1440
if (to_much_garbage &&
1441
(data_log->dlf_state == XT_DL_HAS_SPACE || data_log->dlf_state == XT_DL_READ_ONLY)) {
1442
/* There is too much garbage, it may be compacted. */
1443
if (!dlb_db->db_datalogs.dls_set_log_state(data_log, XT_DL_MAY_COMPACT))
1447
/* Release the open log: */
1448
dlb_db->db_datalogs.dlc_release_open_log(open_log);
1453
dlb_db->db_datalogs.dlc_release_open_log(open_log);
1458
* Delete all the extended data belonging to a particular
1461
xtPublic void xt_dl_delete_ext_data(XTThreadPtr self, XTTableHPtr tab, xtBool XT_UNUSED(missing_ok), xtBool have_table_lock)
1464
xtRecordID page_rec_id, offs_rec_id;
1465
XTTabRecExtDPtr rec_buf;
1466
xtWord4 log_over_size;
1468
xtLogOffset log_offset;
1471
page_data = (xtWord1 *) xt_malloc(self, tab->tab_recs.tci_page_size);
1472
pushr_(xt_free, page_data);
1474
/* Scan the table, and remove all exended data... */
1475
if (!(ot = xt_open_table(tab))) {
1476
if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
1477
XT_FILE_NOT_FOUND(self->t_exception.e_sys_err))
1481
ot->ot_thread = self;
1483
/* {LOCK-EXT-REC} This lock is to stop the compactor changing records
1484
* while we are doing the delete.
1486
xt_lock_mutex_ns(&tab->tab_db->db_co_ext_lock);
1489
while (page_rec_id < tab->tab_rec_eof_id) {
1490
/* NOTE: There is a good reason for using xt_tc_read_page().
1491
* A deadlock can occur if using read, which can run out of
1492
* memory, which waits for the freeer, which may need to
1493
* open a table, which requires the db->db_tables lock,
1494
* which is owned by the this thread, when the function
1495
* is called from drop table.
1497
* xt_tc_read_page() should work because no more changes
1498
* should happen to the table while we are dropping it.
1500
if (!tab->tab_recs.xt_tc_read_page(ot->ot_rec_file, page_rec_id, page_data, self))
1503
for (offs_rec_id=0; offs_rec_id<tab->tab_recs.tci_rows_per_page && page_rec_id+offs_rec_id < tab->tab_rec_eof_id; offs_rec_id++) {
1504
rec_buf = (XTTabRecExtDPtr) (page_data + (offs_rec_id * tab->tab_recs.tci_rec_size));
1505
if (XT_REC_IS_EXT_DLOG(rec_buf->tr_rec_type_1)) {
1506
log_over_size = XT_GET_DISK_4(rec_buf->re_log_dat_siz_4);
1507
XT_GET_LOG_REF(log_id, log_offset, rec_buf);
1509
if (tab->tab_dic.dic_tab_flags & XT_TF_MEMORY_TABLE)
1510
xt_tab_free_ext_slot(tab, log_id, log_offset, log_over_size);
1512
if (!self->st_dlog_buf.dlb_delete_log(log_id, log_offset, log_over_size, tab->tab_id, page_rec_id+offs_rec_id, self)) {
1513
if (self->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
1514
self->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
1515
xt_log_and_clear_exception(self);
1521
page_rec_id += tab->tab_recs.tci_rows_per_page;
1524
xt_unlock_mutex_ns(&tab->tab_db->db_co_ext_lock);
1526
xt_close_table(ot, TRUE, have_table_lock);
1528
freer_(); // xt_free(page_data)
1532
xt_unlock_mutex_ns(&tab->tab_db->db_co_ext_lock);
1534
xt_close_table(ot, TRUE, have_table_lock);
1539
* --------------------------------------------------------------------------------
1540
* GARBAGE COLLECTOR THREAD
1543
xtPublic void xt_dl_init_db(XTThreadPtr self, XTDatabaseHPtr db)
1545
xt_init_mutex_with_autoname(self, &db->db_co_ext_lock);
1546
xt_init_mutex_with_autoname(self, &db->db_co_dlog_lock);
1549
xtPublic void xt_dl_exit_db(XTThreadPtr self, XTDatabaseHPtr db)
1551
xt_stop_compactor(self, db); // Already done!
1552
db->db_co_thread = NULL;
1553
xt_free_mutex(&db->db_co_ext_lock);
1554
xt_free_mutex(&db->db_co_dlog_lock);
1557
xtPublic void xt_dl_set_to_delete(XTThreadPtr self, XTDatabaseHPtr db, xtLogID log_id)
1559
XTDataLogFilePtr data_log;
1561
if (!db->db_datalogs.dlc_get_data_log(&data_log, log_id, FALSE, NULL))
1564
if (!db->db_datalogs.dls_set_log_state(data_log, XT_DL_TO_DELETE))
1569
xtPublic void xt_dl_log_status(XTThreadPtr self, XTDatabaseHPtr db, XTStringBufferPtr strbuf)
1571
XTSortedListPtr list;
1572
XTDataLogFilePtr data_log;
1573
XTDataLogSegPtr seg;
1575
xtLogID *log_id_ptr;
1577
list = xt_new_sortedlist(self, sizeof(xtLogID), 20, 10, dl_cmp_log_id, NULL, NULL, FALSE, FALSE);
1578
pushr_(xt_free_sortedlist, list);
1580
for (u_int i=0; i<XT_DL_NO_OF_SEGMENTS; i++) {
1581
for (u_int j=0; j<XT_DL_SEG_HASH_TABLE_SIZE; j++) {
1582
seg = &db->db_datalogs.dlc_segment[i];
1583
data_log = seg->dls_hash_table[j];
1585
xt_sl_insert(self, list, &data_log->dlf_log_id, &data_log->dlf_log_id);
1586
data_log = data_log->dlf_next_hash;
1591
no_of_logs = xt_sl_get_size(list);
1592
for (u_int i=0; i<no_of_logs; i++) {
1593
log_id_ptr = (xtLogID *) xt_sl_item_at(list, i);
1594
if (!db->db_datalogs.dlc_get_data_log(&data_log, *log_id_ptr, FALSE, &seg))
1597
xt_sb_concat(self, strbuf, "d-log: ");
1598
xt_sb_concat_int8(self, strbuf, data_log->dlf_log_id);
1599
xt_sb_concat(self, strbuf, " status=");
1600
switch (data_log->dlf_state) {
1602
xt_sb_concat(self, strbuf, "?");
1604
case XT_DL_HAS_SPACE:
1605
xt_sb_concat(self, strbuf, "has-space ");
1607
case XT_DL_READ_ONLY:
1608
xt_sb_concat(self, strbuf, "read-only ");
1610
case XT_DL_TO_COMPACT:
1611
xt_sb_concat(self, strbuf, "to-compact");
1613
case XT_DL_COMPACTED:
1614
xt_sb_concat(self, strbuf, "compacted ");
1616
case XT_DL_TO_DELETE:
1617
xt_sb_concat(self, strbuf, "to-delete ");
1620
xt_sb_concat(self, strbuf, "deleted ");
1622
case XT_DL_EXCLUSIVE:
1623
xt_sb_concat(self, strbuf, "x-locked ");
1626
xt_sb_concat(self, strbuf, " eof=");
1627
xt_sb_concat_int8(self, strbuf, data_log->dlf_log_eof);
1628
xt_sb_concat(self, strbuf, " garbage=");
1629
xt_sb_concat_int8(self, strbuf, data_log->dlf_garbage_count);
1630
xt_sb_concat(self, strbuf, " g%=");
1631
if (data_log->dlf_log_eof)
1632
xt_sb_concat_int8(self, strbuf, data_log->dlf_garbage_count * 100 / data_log->dlf_log_eof);
1634
xt_sb_concat(self, strbuf, "100");
1635
xt_sb_concat(self, strbuf, " open=");
1636
xt_sb_concat_int8(self, strbuf, data_log->dlf_open_count);
1637
xt_sb_concat(self, strbuf, "\n");
1639
xt_unlock_mutex_ns(&seg->dls_lock);
1642
freer_(); // xt_free_sortedlist(list)
1645
xtPublic void xt_dl_delete_logs(XTThreadPtr self, XTDatabaseHPtr db)
1647
char path[PATH_MAX];
1652
xt_strcpy(PATH_MAX, path, db->db_main_path);
1653
xt_add_data_dir(PATH_MAX, path);
1654
if (!xt_fs_exists(path))
1656
pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
1657
while (xt_dir_next(self, od)) {
1658
file = xt_dir_name(self, od);
1659
if ((log_id = (xtLogID) xt_file_name_to_id(file))) {
1660
if (!db->db_datalogs.dlc_remove_data_log(log_id, TRUE))
1661
xt_log_and_clear_exception(self);
1663
if (xt_ends_with(file, ".xt")) {
1664
xt_add_dir_char(PATH_MAX, path);
1665
xt_strcat(PATH_MAX, path, file);
1666
xt_fs_delete(self, path);
1667
xt_remove_last_name_of_path(path);
1670
freer_(); // xt_dir_close(od)
1672
/* I no longer attach the condition: !db->db_multi_path
1673
* to removing this directory. This is because
1674
* the pbxt directory must now be removed explicitly
1675
* by drop database, or by delete all the PBXT
1678
if (!xt_fs_rmdir(NULL, path))
1679
xt_log_and_clear_exception(self);
1682
typedef struct XTCompactorState {
1683
XTSeqLogReadPtr cs_seqread;
1684
XTOpenTablePtr cs_ot;
1685
XTDataBufferRec cs_databuf;
1686
} XTCompactorStateRec, *XTCompactorStatePtr;
1688
static void dl_free_compactor_state(XTThreadPtr self, XTCompactorStatePtr cs)
1690
if (cs->cs_seqread) {
1691
cs->cs_seqread->sl_seq_exit();
1692
delete cs->cs_seqread;
1693
cs->cs_seqread = NULL;
1696
xt_db_return_table_to_pool(self, cs->cs_ot);
1699
xt_db_set_size(self, &cs->cs_databuf, 0);
1702
static XTOpenTablePtr dl_cs_get_open_table(XTThreadPtr self, XTCompactorStatePtr cs, xtTableID tab_id)
1705
if (cs->cs_ot->ot_table->tab_id == tab_id)
1708
xt_db_return_table_to_pool(self, cs->cs_ot);
1713
if (!(cs->cs_ot = xt_db_open_pool_table(self, self->st_database, tab_id, NULL, TRUE)))
1720
static void dl_co_wait(XTThreadPtr self, XTDatabaseHPtr db, u_int secs)
1722
xt_lock_mutex(self, &db->db_datalogs.dlc_lock);
1723
pushr_(xt_unlock_mutex, &db->db_datalogs.dlc_lock);
1725
xt_timed_wait_cond(self, &db->db_datalogs.dlc_cond, &db->db_datalogs.dlc_lock, secs * 1000);
1726
freer_(); // xt_unlock_mutex(&db->db_datalogs.dlc_lock)
1730
* Collect all the garbage in a file by moving all valid records
1731
* into some other data log and updating the handles.
1733
static xtBool dl_collect_garbage(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogFilePtr data_log)
1735
XTXactLogBufferDPtr record;
1739
XTCompactorStateRec cs;
1742
XTTabRecExtDRec rec_buffer;
1745
xtLogOffset src_log_offset;
1746
xtLogID curr_log_id;
1747
xtLogOffset curr_log_offset;
1748
xtLogID dest_log_id;
1749
xtLogOffset dest_log_offset;
1750
off_t garbage_count = 0;
1752
memset(&cs, 0, sizeof(XTCompactorStateRec));
1754
if (!(cs.cs_seqread = new XTDataSeqRead()))
1755
xt_throw_errno(XT_CONTEXT, XT_ENOMEM);
1757
if (!cs.cs_seqread->sl_seq_init(db, xt_db_log_buffer_size)) {
1758
delete cs.cs_seqread;
1761
pushr_(dl_free_compactor_state, &cs);
1763
if (!cs.cs_seqread->sl_seq_start(data_log->dlf_log_id, data_log->dlf_start_offset, FALSE))
1768
/* Flush the destination log: */
1769
xt_lock_mutex(self, &db->db_co_dlog_lock);
1770
pushr_(xt_unlock_mutex, &db->db_co_dlog_lock);
1771
if (!self->st_dlog_buf.dlb_flush_log(TRUE, self))
1773
freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)
1775
/* Flush the transaction log. */
1776
if (!xt_xlog_flush_log(db, self))
1779
xt_lock_mutex_ns(&db->db_datalogs.dlc_head_lock);
1780
data_log->dlf_garbage_count += garbage_count;
1781
ASSERT(data_log->dlf_garbage_count < data_log->dlf_log_eof);
1782
if (!dl_write_garbage_level(data_log, cs.cs_seqread->sl_seq_open_file(), TRUE, self)) {
1783
xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);
1786
xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);
1788
freer_(); // dl_free_compactor_state(&cs)
1791
if (!cs.cs_seqread->sl_seq_next(&record, self))
1793
cs.cs_seqread->sl_seq_pos(&curr_log_id, &curr_log_offset);
1795
data_log->dlf_start_offset = curr_log_offset;
1798
switch (record->xh.xh_status_1) {
1799
case XT_LOG_ENT_EXT_REC_OK:
1800
size = XT_GET_DISK_4(record->er.er_data_size_4);
1801
tab_id = XT_GET_DISK_4(record->er.er_tab_id_4);
1802
rec_id = XT_GET_DISK_4(record->er.er_rec_id_4);
1804
if (!(ot = dl_cs_get_open_table(self, &cs, tab_id)))
1808
/* All this is required for a valid record address: */
1809
if (!rec_id || rec_id >= tab->tab_rec_eof_id)
1812
/* {LOCK-EXT-REC} It is important to prevent the compactor from modifying
1813
* a record that has been freed (and maybe allocated again).
1815
* Consider the following sequence:
1817
* 1. Compactor reads the record.
1818
* 2. The record is freed and reallocated.
1819
* 3. The compactor updates the record.
1821
* To prevent this, the compactor locks out the
1822
* sweeper using the db_co_ext_lock lock. The db_co_ext_lock lock
1823
* prevents a extended record from being moved and removed at the
1826
* The compactor also checks the status of the record before
1829
xt_lock_mutex(self, &db->db_co_ext_lock);
1830
pushr_(xt_unlock_mutex, &db->db_co_ext_lock);
1832
/* Read the record: */
1833
if (!xt_tab_get_rec_data(ot, rec_id, offsetof(XTTabRecExtDRec, re_data), (xtWord1 *) &rec_buffer)) {
1834
xt_log_and_clear_warning(self);
1835
freer_(); // xt_unlock_mutex(&db->db_co_ext_lockk)
1839
/* [(7)] REMOVE is followed by FREE:
1840
if (XT_REC_IS_REMOVED(rec_buffer.tr_rec_type_1) || !XT_REC_IS_EXT_DLOG(rec_buffer.tr_rec_type_1)) {
1842
if (!XT_REC_IS_EXT_DLOG(rec_buffer.tr_rec_type_1)) {
1843
freer_(); // xt_unlock_mutex(&db->db_co_ext_lock)
1847
XT_GET_LOG_REF(src_log_id, src_log_offset, &rec_buffer);
1848
src_size = (size_t) XT_GET_DISK_4(rec_buffer.re_log_dat_siz_4);
1850
/* Does the record agree with the current position: */
1851
if (curr_log_id != src_log_id ||
1852
curr_log_offset != src_log_offset ||
1854
freer_(); // xt_unlock_mutex(&db->db_co_ext_lock)
1858
size = offsetof(XTactExtRecEntryDRec, er_data) + size;
1860
/* Allocate space in a destination log: */
1861
xt_lock_mutex(self, &db->db_co_dlog_lock);
1862
pushr_(xt_unlock_mutex, &db->db_co_dlog_lock);
1863
if (!self->st_dlog_buf.dlb_get_log_offset(&dest_log_id, &dest_log_offset, size, self))
1865
freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)
1867
/* This record is referenced by the data: */
1868
xt_db_set_size(self, &cs.cs_databuf, size);
1869
if (!cs.cs_seqread->sl_rnd_read(src_log_offset, size, cs.cs_databuf.db_data, NULL, self))
1872
/* The problem with writing to the buffer here, is that other
1873
* threads want to read the data! */
1874
xt_lock_mutex(self, &db->db_co_dlog_lock);
1875
pushr_(xt_unlock_mutex, &db->db_co_dlog_lock);
1876
if (!self->st_dlog_buf.dlb_write_thru_log(dest_log_id, dest_log_offset, size, cs.cs_databuf.db_data, self))
1878
freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)
1880
/* Make sure we flush the compactor target log, before we
1881
* flush the transaction log!!
1882
* This is done here [(8)]
1885
XT_SET_LOG_REF(&rec_buffer, dest_log_id, dest_log_offset);
1887
if (!xt_tab_put_log_rec_data(ot, XT_LOG_ENT_REC_MOVED, 0, rec_id, 8, (xtWord1 *) &rec_buffer.re_log_id_2, &op_seq))
1889
tab->tab_co_op_seq = op_seq;
1891
/* Only records that were actually moved, count as garbage now!
1892
* This means, lost records, remain "lost" as far as the garbage
1893
* count is concerned!
1895
garbage_count += size;
1896
freer_(); // xt_unlock_mutex(&db->db_co_ext_lock)
1899
data_log->dlf_start_offset = curr_log_offset;
1902
/* Flush the distination log. */
1903
xt_lock_mutex(self, &db->db_co_dlog_lock);
1904
pushr_(xt_unlock_mutex, &db->db_co_dlog_lock);
1905
if (!self->st_dlog_buf.dlb_flush_log(TRUE, self))
1907
freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)
1909
/* Flush the transaction log. */
1910
if (!xt_xlog_flush_log(db, self))
1913
/* Save state in source log header. */
1914
xt_lock_mutex_ns(&db->db_datalogs.dlc_head_lock);
1915
data_log->dlf_garbage_count += garbage_count;
1916
ASSERT(data_log->dlf_garbage_count < data_log->dlf_log_eof);
1917
if (!dl_write_garbage_level(data_log, cs.cs_seqread->sl_seq_open_file(), TRUE, self)) {
1918
xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);
1921
xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);
1923
/* Wait for the writer to write all the changes.
1924
* Then we can start the delete process for the log:
1926
* Note, if we do not wait, then it could be some operations are held up,
1927
* by being out of sequence. This could cause the log to be deleted
1928
* before all the operations have been performed (which are on a table
1934
XTTableEntryPtr tab_ptr;
1938
freer_(); // dl_free_compactor_state(&cs)
1942
xt_ht_lock(self, db->db_tables);
1943
pushr_(xt_ht_unlock, db->db_tables);
1944
xt_enum_tables_init(&edx);
1945
while ((tab_ptr = xt_enum_tables_next(self, db, &edx))) {
1946
if (tab_ptr->te_table && tab_ptr->te_table->tab_co_op_seq > tab_ptr->te_table->tab_head_op_seq) {
1951
freer_(); // xt_ht_unlock(db->db_tables)
1956
/* Nobody will wake me, so check again shortly! */
1957
dl_co_wait(self, db, 1);
1960
db->db_datalogs.dls_set_log_state(data_log, XT_DL_COMPACTED);
1962
#ifdef DEBUG_LOG_DELETE
1963
printf("-- MARK FOR DELETE IN LOG: %d\n", (int) data_log->dlf_log_id);
1965
/* Log that this log should be deleted on the next checkpoint: */
1966
// transaction log...
1967
XTXactNewLogEntryDRec log_rec;
1968
log_rec.xl_status_1 = XT_LOG_ENT_DEL_LOG;
1969
log_rec.xl_checksum_1 = XT_CHECKSUM_1(data_log->dlf_log_id);
1970
XT_SET_DISK_4(log_rec.xl_log_id_4, data_log->dlf_log_id);
1971
if (!xt_xlog_log_data(self, sizeof(XTXactNewLogEntryDRec), (XTXactLogBufferDPtr) &log_rec, XT_XLOG_WRITE_AND_FLUSH)) {
1972
db->db_datalogs.dls_set_log_state(data_log, XT_DL_TO_COMPACT);
1976
freer_(); // dl_free_compactor_state(&cs)
1980
static void dl_co_not_busy(XTThreadPtr XT_UNUSED(self), XTDatabaseHPtr db)
1982
db->db_co_busy = FALSE;
1985
static void dl_co_main(XTThreadPtr self, xtBool once_off)
1987
XTDatabaseHPtr db = self->st_database;
1988
xtLogID *log_id_ptr, log_id;
1989
XTDataLogFilePtr data_log = NULL;
1991
xt_set_low_priority(self);
1993
while (!self->t_quit) {
1994
while (!self->t_quit) {
1995
xt_lock_mutex_ns(&db->db_datalogs.dlc_lock);
1996
if ((log_id_ptr = (xtLogID *) xt_sl_first_item(db->db_datalogs.dlc_to_compact))) {
1997
log_id = *log_id_ptr;
2001
xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
2004
if (!db->db_datalogs.dlc_get_data_log(&data_log, log_id, FALSE, NULL))
2008
db->db_co_busy = TRUE;
2009
pushr_(dl_co_not_busy, db);
2010
dl_collect_garbage(self, db, data_log);
2011
freer_(); // dl_co_not_busy(db)
2014
xt_lock_mutex_ns(&db->db_datalogs.dlc_lock);
2015
xt_sl_delete(self, db->db_datalogs.dlc_to_compact, &log_id);
2016
xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
2023
/* Wait for a signal that a data log can be collected: */
2024
dl_co_wait(self, db, 120);
2028
static void *dl_run_co_thread(XTThreadPtr self)
2030
XTDatabaseHPtr db = (XTDatabaseHPtr) self->t_data;
2034
if (!(mysql_thread = myxt_create_thread()))
2037
while (!self->t_quit) {
2040
* The garbage collector requires that the database
2041
* is in use because.
2043
xt_use_database(self, db, XT_FOR_COMPACTOR);
2045
/* {BACKGROUND-RELEASE-DB}
2046
* This action is both safe and required:
2048
* safe: releasing the database is safe because as
2049
* long as this thread is running the database
2050
* reference is valid, and this reference cannot
2051
* be the only one to the database because
2052
* otherwize this thread would not be running.
2054
* required: releasing the database is necessary
2055
* otherwise we cannot close the database
2056
* correctly because we only shutdown this
2057
* thread when the database is closed and we
2058
* only close the database when all references
2061
xt_heap_release(self, self->st_database);
2063
dl_co_main(self, FALSE);
2066
if (!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
2067
self->t_exception.e_sys_err == SIGTERM))
2068
xt_log_and_clear_exception(self);
2072
/* Avoid releasing the database (done above) */
2073
self->st_database = NULL;
2074
xt_unuse_database(self, self);
2076
/* After an exception, pause before trying again... */
2077
/* Number of seconds */
2083
while (!self->t_quit && count > 0) {
2090
* {MYSQL-THREAD-KILL}
2091
myxt_destroy_thread(mysql_thread, TRUE);
2096
static void dl_free_co_thread(XTThreadPtr self, void *data)
2098
XTDatabaseHPtr db = (XTDatabaseHPtr) data;
2100
if (db->db_co_thread) {
2101
xt_lock_mutex(self, &db->db_datalogs.dlc_lock);
2102
pushr_(xt_unlock_mutex, &db->db_datalogs.dlc_lock);
2103
db->db_co_thread = NULL;
2104
freer_(); // xt_unlock_mutex(&db->db_datalogs.dlc_lock)
2108
xtPublic void xt_start_compactor(XTThreadPtr self, XTDatabaseHPtr db)
2110
char name[PATH_MAX];
2112
sprintf(name, "GC-%s", xt_last_directory_of_path(db->db_main_path));
2113
xt_remove_dir_char(name);
2114
db->db_co_thread = xt_create_daemon(self, name);
2115
xt_set_thread_data(db->db_co_thread, db, dl_free_co_thread);
2116
xt_run_thread(self, db->db_co_thread, dl_run_co_thread);
2119
static void dl_wake_co_thread(XTDatabaseHPtr db)
2121
if (!xt_signal_cond(NULL, &db->db_datalogs.dlc_cond))
2122
xt_log_and_clear_exception_ns();
2125
xtPublic void xt_stop_compactor(XTThreadPtr self, XTDatabaseHPtr db)
2129
if (db->db_co_thread) {
2130
xt_lock_mutex(self, &db->db_datalogs.dlc_lock);
2131
pushr_(xt_unlock_mutex, &db->db_datalogs.dlc_lock);
2133
/* This pointer is safe as long as you have the transaction lock. */
2134
if ((thr_co = db->db_co_thread)) {
2135
xtThreadID tid = thr_co->t_id;
2137
/* Make sure the thread quits when woken up. */
2138
xt_terminate_thread(self, thr_co);
2140
dl_wake_co_thread(db);
2142
freer_(); // xt_unlock_mutex(&db->db_datalogs.dlc_lock)
2145
* This seems to kill the whole server sometimes!!
2146
* SIGTERM is going to a different thread??!
2147
xt_kill_thread(thread);
2149
xt_wait_for_thread_to_exit(tid, FALSE);
2151
/* PMC - This should not be necessary to set the signal here, but in the
2152
* debugger the handler is not called!!?
2153
thr_co->t_delayed_signal = SIGTERM;
2154
xt_kill_thread(thread);
2156
db->db_co_thread = NULL;
2159
freer_(); // xt_unlock_mutex(&db->db_datalogs.dlc_lock)