1
/* Copyright (c) 2007 PrimeBase Technologies GmbH
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* 2007-10-30 Paul McCullagh
23
* The transaction log contains all operations on the data handle
24
* and row pointer files of a table.
26
* The transaction log does not contain operations on index data.
29
#include "xt_config.h"
37
#include "xactlog_xt.h"
38
#include "database_xt.h"
40
#include "strutil_xt.h"
41
#include "filesys_xt.h"
46
//#define PRINT_TABLE_MODIFICATIONS
47
//#define TRACE_WRITER_ACTIVITY
51
#define PREWRITE_LOG_COMPLETELY
55
static void xlog_wr_log_written(XTDatabaseHPtr db);
58
* -----------------------------------------------------------------------
59
* T R A N S A C T I O L O G C A C H E
62
static XTXLogCacheRec xt_xlog_cache;
65
* Initialize the disk cache.
67
xtPublic void xt_xlog_init(XTThreadPtr self, size_t cache_size)
72
* This is required to ensure that the block
76
/* Determine the number of block that will fit into the given memory: */
78
xt_xlog_cache.xlc_hash_size = (cache_size / (XLC_SEGMENT_COUNT * sizeof(XTXLogBlockPtr) + sizeof(XTXLogBlockRec))) / (XLC_SEGMENT_COUNT >> 1);
79
xt_xlog_cache.xlc_block_count = (cache_size - (XLC_SEGMENT_COUNT * xt_xlog_cache.xlc_hash_size * sizeof(XTXLogBlockPtr))) / sizeof(XTXLogBlockRec);
81
/* Do not count the size of the cache directory towards the cache size: */
82
xt_xlog_cache.xlc_block_count = cache_size / sizeof(XTXLogBlockRec);
83
xt_xlog_cache.xlc_upper_limit = ((xtWord8) xt_xlog_cache.xlc_block_count * (xtWord8) XT_XLC_BLOCK_SIZE * (xtWord8) 3) / (xtWord8) 4;
84
xt_xlog_cache.xlc_hash_size = xt_xlog_cache.xlc_block_count / (XLC_SEGMENT_COUNT >> 1);
85
if (!xt_xlog_cache.xlc_hash_size)
86
xt_xlog_cache.xlc_hash_size = 1;
89
for (u_int i=0; i<XLC_SEGMENT_COUNT; i++) {
90
xt_xlog_cache.xlc_segment[i].lcs_hash_table = (XTXLogBlockPtr *) xt_calloc(self, xt_xlog_cache.xlc_hash_size * sizeof(XTXLogBlockPtr));
91
xt_init_mutex_with_autoname(self, &xt_xlog_cache.xlc_segment[i].lcs_lock);
92
xt_init_cond(self, &xt_xlog_cache.xlc_segment[i].lcs_cond);
95
block = (XTXLogBlockPtr) xt_malloc(self, xt_xlog_cache.xlc_block_count * sizeof(XTXLogBlockRec));
96
xt_xlog_cache.xlc_blocks = block;
97
xt_xlog_cache.xlc_blocks_end = (XTXLogBlockPtr) ((char *) block + (xt_xlog_cache.xlc_block_count * sizeof(XTXLogBlockRec)));
98
xt_xlog_cache.xlc_next_to_free = block;
99
xt_init_mutex_with_autoname(self, &xt_xlog_cache.xlc_lock);
100
xt_init_cond(self, &xt_xlog_cache.xlc_cond);
102
for (u_int i=0; i<xt_xlog_cache.xlc_block_count; i++) {
103
block->xlb_address = 0;
104
block->xlb_log_id = 0;
105
block->xlb_state = XLC_BLOCK_FREE;
108
xt_xlog_cache.xlc_free_count = xt_xlog_cache.xlc_block_count;
113
xt_enter_exception_handler(self, &e);
115
xt_exit_exception_handler(self, &e);
121
xtPublic void xt_xlog_exit(XTThreadPtr)
123
for (u_int i=0; i<XLC_SEGMENT_COUNT; i++) {
124
if (xt_xlog_cache.xlc_segment[i].lcs_hash_table) {
125
xt_free_ns(xt_xlog_cache.xlc_segment[i].lcs_hash_table);
126
xt_xlog_cache.xlc_segment[i].lcs_hash_table = NULL;
127
xt_free_mutex(&xt_xlog_cache.xlc_segment[i].lcs_lock);
128
xt_free_cond(&xt_xlog_cache.xlc_segment[i].lcs_cond);
132
if (xt_xlog_cache.xlc_blocks) {
133
xt_free_ns(xt_xlog_cache.xlc_blocks);
134
xt_xlog_cache.xlc_blocks = NULL;
135
xt_free_mutex(&xt_xlog_cache.xlc_lock);
136
xt_free_cond(&xt_xlog_cache.xlc_cond);
138
memset(&xt_xlog_cache, 0, sizeof(xt_xlog_cache));
141
xtPublic xtInt8 xt_xlog_get_usage()
145
size = (xtInt8) (xt_xlog_cache.xlc_block_count - xt_xlog_cache.xlc_free_count) * sizeof(XTXLogBlockRec);
149
xtPublic xtInt8 xt_xlog_get_size()
153
size = (xtInt8) xt_xlog_cache.xlc_block_count * sizeof(XTXLogBlockRec);
157
xtPublic xtLogID xt_xlog_get_min_log(XTThreadPtr self, XTDatabaseHPtr db)
162
xtLogID log_id, min_log = 0;
164
xt_strcpy(PATH_MAX, path, db->db_main_path);
165
xt_add_system_dir(PATH_MAX, path);
166
if (xt_fs_exists(path)) {
167
pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
168
while (xt_dir_next(self, od)) {
169
file = xt_dir_name(self, od);
170
if (xt_starts_with(file, "xlog")) {
171
if ((log_id = (xtLogID) xt_file_name_to_id(file))) {
172
if (!min_log || log_id < min_log)
177
freer_(); // xt_dir_close(od)
184
xtPublic void xt_xlog_delete_logs(XTThreadPtr self, XTDatabaseHPtr db)
190
/* Close all the index logs before we delete them: */
191
db->db_indlogs.ilp_close(self, TRUE);
193
/* Close the transaction logs too: */
194
db->db_xlog.xlog_close(self);
196
xt_strcpy(PATH_MAX, path, db->db_main_path);
197
xt_add_system_dir(PATH_MAX, path);
198
if (!xt_fs_exists(path))
200
pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
201
while (xt_dir_next(self, od)) {
202
file = xt_dir_name(self, od);
203
if (xt_ends_with(file, ".xt")) {
204
xt_add_dir_char(PATH_MAX, path);
205
xt_strcat(PATH_MAX, path, file);
206
xt_fs_delete(self, path);
207
xt_remove_last_name_of_path(path);
210
freer_(); // xt_dir_close(od)
212
/* I no longer attach the condition: !db->db_multi_path
213
* to removing this directory. This is because
214
* the pbxt directory must now be removed explicitly
215
* by drop database, or by delete all the PBXT
218
if (!xt_fs_rmdir(NULL, path))
219
xt_log_and_clear_exception(self);
222
#ifdef DEBUG_CHECK_CACHE
223
static void xt_xlog_check_cache(void)
225
XTXLogBlockPtr block, pblock;
229
// Check the LRU list:
232
block = xt_xlog_cache.xlc_lru_block;
235
ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
236
ASSERT_NS(block->xlb_lr_used == pblock);
238
block = block->xlb_mr_used;
240
ASSERT_NS(xt_xlog_cache.xlc_mru_block == pblock);
241
ASSERT_NS(xt_xlog_cache.xlc_free_count + used_count == xt_xlog_cache.xlc_block_count);
243
// Check the free list:
245
block = xt_xlog_cache.xlc_free_list;
248
ASSERT_NS(block->xlb_state == XLC_BLOCK_FREE);
249
block = block->xlb_next;
251
ASSERT_NS(xt_xlog_cache.xlc_free_count == free_count);
256
static void xlog_check_lru_list(XTXLogBlockPtr block)
258
XTXLogBlockPtr list_block, plist_block;
261
list_block = xt_xlog_cache.xlc_lru_block;
263
ASSERT_NS(block != list_block);
264
ASSERT_NS(list_block->xlb_lr_used == plist_block);
265
plist_block = list_block;
266
list_block = list_block->xlb_mr_used;
268
ASSERT_NS(xt_xlog_cache.xlc_mru_block == plist_block);
273
* Log cache blocks are used and freed on a round-robin basis.
274
* In addition, only data read by restart, and data transfered
275
* from the transaction log are stored in the transaction log.
277
* This ensures that the transaction log contains the most
278
* recently written log data.
280
* If the sweeper gets behind due to a long running transacation
281
* then it falls out of the log cache, and must read from
282
* the log files directly.
284
* This data read is no longer cached as it was previously.
285
* This has the advantage that it does not disturn the writter
286
* thread which would otherwise hit the cache.
288
* If transactions are not too long, it should be possible
289
* to keep the sweeper in the log cache.
291
static xtBool xlog_free_block(XTXLogBlockPtr to_free)
293
XTXLogBlockPtr block, pblock;
296
XTXLogCacheSegPtr seg;
300
log_id = to_free->xlb_log_id;
301
address = to_free->xlb_address;
303
seg = &xt_xlog_cache.xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK];
304
hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % xt_xlog_cache.xlc_hash_size;
306
xt_lock_mutex_ns(&seg->lcs_lock);
307
if (to_free->xlb_state == XLC_BLOCK_FREE)
309
if (to_free->xlb_log_id != log_id || to_free->xlb_address != address) {
310
xt_unlock_mutex_ns(&seg->lcs_lock);
315
block = seg->lcs_hash_table[hash_idx];
317
if (block->xlb_address == address && block->xlb_log_id == log_id) {
318
ASSERT_NS(block == to_free);
319
ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
321
/* Wait if the block is being read: */
322
if (block->xlb_state == XLC_BLOCK_READING) {
323
/* Wait for the block to be read, then try again. */
324
if (!xt_timed_wait_cond_ns(&seg->lcs_cond, &seg->lcs_lock, 100))
326
xt_unlock_mutex_ns(&seg->lcs_lock);
333
block = block->xlb_next;
336
/* We did not find the block, someone else freed it... */
337
xt_unlock_mutex_ns(&seg->lcs_lock);
341
ASSERT_NS(block->xlb_state == XLC_BLOCK_CLEAN);
343
/* Remove from the hash table: */
345
pblock->xlb_next = block->xlb_next;
347
seg->lcs_hash_table[hash_idx] = block->xlb_next;
349
/* Free the block: */
350
xt_xlog_cache.xlc_free_count++;
351
block->xlb_state = XLC_BLOCK_FREE;
354
xt_unlock_mutex_ns(&seg->lcs_lock);
358
xt_unlock_mutex_ns(&seg->lcs_lock);
362
#define XT_FETCH_READ 0
363
#define XT_FETCH_BLANK 1
364
#define XT_FETCH_TEST 2
366
static xtBool xlog_fetch_block(XTXLogBlockPtr *ret_block, XTOpenFilePtr file, xtLogID log_id, off_t address, XTXLogCacheSegPtr *ret_seg, int fetch_type, XTThreadPtr thread)
368
register XTXLogBlockPtr block;
369
register XTXLogCacheSegPtr seg;
370
register u_int hash_idx;
371
register XTXLogCacheRec *dcg = &xt_xlog_cache;
374
/* Make sure we have a free block ready (to avoid unlock below): */
375
if (fetch_type != XT_FETCH_TEST && dcg->xlc_next_to_free->xlb_state != XLC_BLOCK_FREE) {
376
if (!xlog_free_block(dcg->xlc_next_to_free))
380
seg = &dcg->xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK];
381
hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % dcg->xlc_hash_size;
383
xt_lock_mutex_ns(&seg->lcs_lock);
385
block = seg->lcs_hash_table[hash_idx];
387
if (block->xlb_address == address && block->xlb_log_id == log_id) {
388
ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
391
* Wait if the block is being read.
393
if (block->xlb_state == XLC_BLOCK_READING) {
394
if (!xt_timed_wait_cond_ns(&seg->lcs_cond, &seg->lcs_lock, 100)) {
395
xt_unlock_mutex_ns(&seg->lcs_lock);
403
thread->st_statistics.st_xlog_cache_hit++;
406
block = block->xlb_next;
409
if (fetch_type == XT_FETCH_TEST) {
410
xt_unlock_mutex_ns(&seg->lcs_lock);
413
thread->st_statistics.st_xlog_cache_miss++;
417
/* Block not found: */
419
if (dcg->xlc_next_to_free->xlb_state != XLC_BLOCK_FREE) {
420
xt_unlock_mutex_ns(&seg->lcs_lock);
421
if (!xlog_free_block(dcg->xlc_next_to_free))
423
xt_lock_mutex_ns(&seg->lcs_lock);
426
xt_lock_mutex_ns(&dcg->xlc_lock);
427
block = dcg->xlc_next_to_free;
428
if (block->xlb_state != XLC_BLOCK_FREE) {
429
xt_unlock_mutex_ns(&dcg->xlc_lock);
432
dcg->xlc_next_to_free++;
433
if (dcg->xlc_next_to_free == dcg->xlc_blocks_end)
434
dcg->xlc_next_to_free = dcg->xlc_blocks;
435
dcg->xlc_free_count--;
437
if (fetch_type == XT_FETCH_READ) {
438
block->xlb_address = address;
439
block->xlb_log_id = log_id;
440
block->xlb_state = XLC_BLOCK_READING;
442
xt_unlock_mutex_ns(&dcg->xlc_lock);
444
/* Add the block to the hash table: */
445
block->xlb_next = seg->lcs_hash_table[hash_idx];
446
seg->lcs_hash_table[hash_idx] = block;
448
/* Read the block into memory: */
449
xt_unlock_mutex_ns(&seg->lcs_lock);
451
if (!xt_pread_file(file, address, XT_XLC_BLOCK_SIZE, 0, block->xlb_data, &red_size, &thread->st_statistics.st_xlog, thread))
453
memset(block->xlb_data + red_size, 0, XT_XLC_BLOCK_SIZE - red_size);
454
thread->st_statistics.st_xlog_cache_miss++;
456
xt_lock_mutex_ns(&seg->lcs_lock);
457
block->xlb_state = XLC_BLOCK_CLEAN;
458
xt_cond_wakeall(&seg->lcs_cond);
461
block->xlb_address = address;
462
block->xlb_log_id = log_id;
463
block->xlb_state = XLC_BLOCK_CLEAN;
464
memset(block->xlb_data, 0, XT_XLC_BLOCK_SIZE);
466
xt_unlock_mutex_ns(&dcg->xlc_lock);
468
/* Add the block to the hash table: */
469
block->xlb_next = seg->lcs_hash_table[hash_idx];
470
seg->lcs_hash_table[hash_idx] = block;
475
#ifdef DEBUG_CHECK_CACHE
476
//xt_xlog_check_cache();
481
static xtBool xlog_transfer_to_cache(XTOpenFilePtr file, xtLogID log_id, off_t offset, size_t size, xtWord1 *data, XTThreadPtr thread)
484
XTXLogBlockPtr block;
485
XTXLogCacheSegPtr seg;
488
xtBool read_block = FALSE;
490
#ifdef DEBUG_CHECK_CACHE
491
//xt_xlog_check_cache();
493
/* We have to read the first block, if we are
494
* not at the begining of the file:
498
address = offset & ~XT_XLC_BLOCK_MASK;
500
boff = (size_t) (offset - address);
501
tfer = XT_XLC_BLOCK_SIZE - boff;
505
if (!xlog_fetch_block(&block, file, log_id, address, &seg, read_block ? XT_FETCH_READ : XT_FETCH_BLANK, thread)) {
506
#ifdef DEBUG_CHECK_CACHE
507
//xt_xlog_check_cache();
511
ASSERT_NS(block && block->xlb_state == XLC_BLOCK_CLEAN);
512
memcpy(block->xlb_data + boff, data, tfer);
513
xt_unlock_mutex_ns(&seg->lcs_lock);
517
/* Following block need not be read
518
* because we always transfer to the
522
address += XT_XLC_BLOCK_SIZE;
526
if (tfer > XT_XLC_BLOCK_SIZE)
527
tfer = XT_XLC_BLOCK_SIZE;
529
#ifdef DEBUG_CHECK_CACHE
530
//xt_xlog_check_cache();
535
static xtBool xt_xlog_read(XTOpenFilePtr file, xtLogID log_id, off_t offset, size_t size, xtWord1 *data, xtBool load_cache, XTThreadPtr thread)
538
XTXLogBlockPtr block;
539
XTXLogCacheSegPtr seg;
543
#ifdef DEBUG_CHECK_CACHE
544
//xt_xlog_check_cache();
546
address = offset & ~XT_XLC_BLOCK_MASK;
547
boff = (size_t) (offset - address);
548
tfer = XT_XLC_BLOCK_SIZE - boff;
552
if (!xlog_fetch_block(&block, file, log_id, address, &seg, load_cache ? XT_FETCH_READ : XT_FETCH_TEST, thread))
557
if (!xt_pread_file(file, address + boff, size, 0, data, &red_size, &thread->st_statistics.st_xlog, thread))
559
memset(data + red_size, 0, size - red_size);
562
memcpy(data, block->xlb_data + boff, tfer);
563
xt_unlock_mutex_ns(&seg->lcs_lock);
566
address += XT_XLC_BLOCK_SIZE;
569
if (tfer > XT_XLC_BLOCK_SIZE)
570
tfer = XT_XLC_BLOCK_SIZE;
572
#ifdef DEBUG_CHECK_CACHE
573
//xt_xlog_check_cache();
578
static xtBool xt_xlog_write(XTOpenFilePtr file, xtLogID log_id, off_t offset, size_t size, xtWord1 *data, XTThreadPtr thread)
580
if (!xt_pwrite_file(file, offset, size, data, &thread->st_statistics.st_xlog, thread))
582
return xlog_transfer_to_cache(file, log_id, offset, size, data, thread);
586
* -----------------------------------------------------------------------
587
* D A T A B A S E T R A N S A C T I O N L O G S
590
void XTDatabaseLog::xlog_setup(XTThreadPtr self, XTDatabaseHPtr db, off_t inp_log_file_size, size_t transaction_buffer_size, int log_count)
592
volatile off_t log_file_size = inp_log_file_size;
596
memset(this, 0, sizeof(XTDatabaseLogRec));
600
else if (log_count > 1000000)
605
xl_log_file_threshold = xt_align_offset(log_file_size, 1024);
606
xl_log_file_count = log_count;
607
xl_size_of_buffers = transaction_buffer_size;
609
xt_init_mutex_with_autoname(self, &xl_write_lock);
610
xt_init_cond(self, &xl_write_cond);
611
#ifdef XT_XLOG_WAIT_SPINS
620
xt_spinlock_init_with_autoname(self, &xl_buffer_lock);
622
/* Note that we allocate a little bit more for each buffer
623
* in order to make sure that we can write a trailing record
626
log_size = transaction_buffer_size + sizeof(XTXactNewLogEntryDRec);
628
/* Add in order to round the buffer to an integral of 512 */
630
log_size += (512 - (log_size % 512));
633
xl_write_log_offset = 0;
634
xl_write_buf_pos = 0;
635
xl_write_buf_pos_start = 0;
636
xl_write_buffer = (xtWord1 *) xt_malloc(self, log_size);
637
xl_write_done = TRUE;
639
xl_append_log_id = 0;
640
xl_append_log_offset = 0;
641
xl_append_buf_pos = 0;
642
xl_append_buf_pos_start = 0;
643
xl_append_buffer = (xtWord1 *) xt_malloc(self, log_size);
645
xl_last_flush_time = 10;
647
xl_flush_log_offset = 0;
652
xt_enter_exception_handler(self, &e);
654
xt_exit_exception_handler(self, &e);
660
xtBool XTDatabaseLog::xlog_set_write_offset(xtLogID log_id, xtLogOffset log_offset, xtLogID max_log_id, XTThreadPtr thread)
662
xl_max_log_id = max_log_id;
664
xl_write_log_id = log_id;
665
xl_write_log_offset = log_offset;
666
xl_write_buf_pos = 0;
667
xl_write_buf_pos_start = 0;
668
xl_write_done = TRUE;
670
xl_append_log_id = log_id;
671
xl_append_log_offset = log_offset;
672
if (log_offset == 0) {
673
XTXactLogHeaderDPtr log_head;
675
log_head = (XTXactLogHeaderDPtr) xl_append_buffer;
676
memset(log_head, 0, sizeof(XTXactLogHeaderDRec));
677
log_head->xh_status_1 = XT_LOG_ENT_HEADER;
678
log_head->xh_checksum_1 = XT_CHECKSUM_1(log_id);
679
XT_SET_DISK_4(log_head->xh_size_4, sizeof(XTXactLogHeaderDRec));
680
XT_SET_DISK_4(log_head->xh_log_id_4, log_id);
681
XT_SET_DISK_2(log_head->xh_version_2, XT_LOG_VERSION_NO);
682
XT_SET_DISK_4(log_head->xh_magic_4, XT_LOG_FILE_MAGIC);
683
xl_append_buf_pos = sizeof(XTXactLogHeaderDRec);
684
xl_append_buf_pos_start = 0;
687
/* Start the log buffer at a block boundary: */
688
size_t buf_pos = (size_t) (log_offset % 512);
690
xl_append_buf_pos = buf_pos;
691
xl_append_buf_pos_start = buf_pos;
692
xl_append_log_offset = log_offset - buf_pos;
694
if (!xlog_open_log(log_id, log_offset, thread))
697
if (!xt_pread_file(xl_log_file, xl_append_log_offset, buf_pos, buf_pos, xl_append_buffer, NULL, &thread->st_statistics.st_xlog, thread))
701
xl_flush_log_id = log_id;
702
xl_flush_log_offset = log_offset;
706
void XTDatabaseLog::xlog_close(XTThreadPtr self)
709
xt_close_file(self, xl_log_file);
714
void XTDatabaseLog::xlog_exit(XTThreadPtr self)
716
xt_spinlock_free(self, &xl_buffer_lock);
717
xt_free_mutex(&xl_write_lock);
718
xt_free_cond(&xl_write_cond);
720
if (xl_write_buffer) {
721
xt_free_ns(xl_write_buffer);
722
xl_write_buffer = NULL;
724
if (xl_append_buffer) {
725
xt_free_ns(xl_append_buffer);
726
xl_append_buffer = NULL;
730
#define WR_NO_SPACE 1 /* Write because there is no space, or some other reason */
731
#define WR_FLUSH 2 /* Normal commit, write and flush */
733
xtBool XTDatabaseLog::xlog_flush(XTThreadPtr thread)
735
if (!xlog_flush_pending())
737
return xlog_append(thread, 0, NULL, 0, NULL, XT_XLOG_WRITE_AND_FLUSH, NULL, NULL);
740
xtBool XTDatabaseLog::xlog_flush_pending()
742
xtLogID req_flush_log_id;
743
xtLogOffset req_flush_log_offset;
745
xt_lck_slock(&xl_buffer_lock);
746
req_flush_log_id = xl_append_log_id;
747
req_flush_log_offset = xl_append_log_offset + xl_append_buf_pos;
748
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
749
xt_spinlock_unlock(&xl_buffer_lock);
752
xt_spinlock_unlock(&xl_buffer_lock);
757
* Write data to the end of the log buffer.
759
* commit is set to true if the caller also requires
760
* the log to be flushed, after writing the data.
762
* This function returns the log ID and offset of
763
* the data write position.
765
xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *data1, size_t size2, xtWord1 *data2, int flush_log_at_trx_commit, xtLogID *log_id, xtLogOffset *log_offset)
767
int write_reason = 0;
768
xtLogID req_flush_log_id;
769
xtLogOffset req_flush_log_offset;
774
/* The first size value must be set, of the second is set! */
775
ASSERT_NS(size1 || !size2);
778
/* Just flush the buffer... */
779
xt_lck_slock(&xl_buffer_lock);
780
write_reason = flush_log_at_trx_commit == XT_XLOG_WRITE_AND_FLUSH ? WR_FLUSH : WR_NO_SPACE;
781
req_flush_log_id = xl_append_log_id;
782
req_flush_log_offset = xl_append_log_offset + xl_append_buf_pos;
783
xt_spinlock_unlock(&xl_buffer_lock);
784
goto write_log_to_file;
786
req_flush_log_id = 0;
787
req_flush_log_offset = 0;
790
* This is a dirty read, which will send us to the
791
* best starting position:
793
* If there is space, now, then there is probably
794
* still enough space, after we have locked the
795
* buffer for writting.
797
if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers)
798
goto copy_to_log_buffer;
801
* There is not enough space in the append buffer.
802
* So we need to write the log, until there is space.
804
write_reason = WR_NO_SPACE;
808
/* We need to write for one of 2 reasons: not
809
* enough space in the buffer, or a flush
815
* The objective of the following code is to
816
* pick one writer, out of all threads.
817
* The rest will wait for the writer.
820
if (write_reason == WR_FLUSH) {
821
/* Before we flush, check if we should wait for running
822
* transactions that may commit shortly.
824
if (xl_db->db_xn_writer_count - xl_db->db_xn_writer_wait_count - xl_db->db_xn_long_running_count > 0 && xl_last_flush_time) {
825
/* Wait for about as long as the last flush took,
826
* the idea is to saturate the disk with flushing...: */
827
then = xt_trace_clock() + (xtWord8) xl_last_flush_time;
830
/* If a thread leaves this loop because times up, or
831
* a thread manages to flush so fast that this thread
832
* sleeps during this time, then it could be that
833
* the required flush occurs before other conditions
834
* of this loop are met!
836
* So we check here to make sure that the log has not been
837
* flushed as we require:
839
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
840
ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
844
if (xl_db->db_xn_writer_count - xl_db->db_xn_writer_wait_count - xl_db->db_xn_long_running_count > 0)
846
if (xt_trace_clock() >= then)
852
#ifdef XT_XLOG_WAIT_SPINS
853
/* Spin for 1/1000s: */
854
then = xt_trace_clock() + (xtWord8) 1000;
856
if (!xt_atomic_tas4(&xt_writing, 1))
859
/* If I am not the writer, then I just waited for the
860
* writer. So it may be that my requirements have now
863
if (write_reason == WR_FLUSH) {
864
/* If the reason was to flush, then
865
* check the last flush sequence, maybe it is passed
866
* our required sequence.
868
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
869
/* The required flush position of the log is before
870
* or equal to the actual flush position. This means the condition
871
* for this thread have been satified (via group commit).
872
* Nothing more to do!
874
ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
879
/* It may be that there is now space in the append buffer: */
880
if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers)
881
goto copy_to_log_buffer;
884
/* We are just writing the buffer! */
885
ASSERT_NS(write_reason == WR_NO_SPACE);
886
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0)
890
if (xt_trace_clock() >= then) {
891
xt_lock_mutex_ns(&xl_write_lock);
893
if (!xt_timed_wait_cond_ns(&xl_write_cond, &xl_write_lock, 500)) {
895
xt_unlock_mutex_ns(&xl_write_lock);
899
xt_unlock_mutex_ns(&xl_write_lock);
908
xt_lock_mutex_ns(&xl_write_lock);
910
if (!xt_timed_wait_cond_ns(&xl_write_cond, &xl_write_lock, 500)) {
911
xt_unlock_mutex_ns(&xl_write_lock);
919
xt_unlock_mutex_ns(&xl_write_lock);
922
/* If I am not the writer, then I just waited for the
923
* writer. So it may be that my requirements have now
926
if (write_reason == WR_FLUSH) {
927
/* If the reason was to flush, then
928
* check the last flush sequence, maybe it is passed
929
* our required sequence.
931
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
932
/* The required flush position of the log is before
933
* or equal to the actual flush position. This means the condition
934
* for this thread have been satified (via group commit).
935
* Nothing more to do!
937
ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
942
/* It may be that there is now space in the append buffer: */
943
if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers)
944
goto copy_to_log_buffer;
947
/* We are just writing the buffer! */
948
ASSERT_NS(write_reason == WR_NO_SPACE);
949
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0)
953
goto write_log_to_file;
957
/* I am the writer, check the conditions, again: */
958
if (write_reason == WR_FLUSH) {
959
/* The writer wants the log to be flushed to a particular point: */
960
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
961
/* The writers required flush position is before or equal
962
* to the actual position, so the writer is done...
964
#ifdef XT_XLOG_WAIT_SPINS
967
xt_cond_wakeall(&xl_write_cond);
970
xt_cond_wakeall(&xl_write_cond);
972
ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
975
/* Not flushed, but what about written? */
976
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0) {
977
/* The write position is after or equal to the required flush
978
* position. This means that all we have to do is flush
979
* to satisfy the writers condition.
983
if (xl_log_id != xl_write_log_id)
984
ok = xlog_open_log(xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start), thread);
987
if (xl_db->db_co_busy) {
988
/* [(8)] Flush the compactor log. */
989
xt_lock_mutex_ns(&xl_db->db_co_dlog_lock);
990
ok = xl_db->db_co_thread->st_dlog_buf.dlb_flush_log(TRUE, thread);
991
xt_unlock_mutex_ns(&xl_db->db_co_dlog_lock);
996
flush_time = thread->st_statistics.st_xlog.ts_flush_time;
997
if ((ok = xt_flush_file(xl_log_file, &thread->st_statistics.st_xlog, thread))) {
998
xl_last_flush_time = (u_int) (thread->st_statistics.st_xlog.ts_flush_time - flush_time);
999
xl_log_bytes_flushed = xl_log_bytes_written;
1001
xt_lock_mutex_ns(&xl_db->db_wr_lock);
1002
xl_flush_log_id = xl_write_log_id;
1003
xl_flush_log_offset = xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start);
1005
* We have written data to the log, wake the writer to commit
1006
* the data to the database.
1008
xlog_wr_log_written(xl_db);
1009
xt_unlock_mutex_ns(&xl_db->db_wr_lock);
1012
#ifdef XT_XLOG_WAIT_SPINS
1015
xt_cond_wakeall(&xl_write_cond);
1018
xt_cond_wakeall(&xl_write_cond);
1020
ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
1025
/* If the amounf of data to be written is 0, then we are just required
1026
* to write the transaction buffer.
1028
* If there is space in the buffer, then we can go on
1029
* to copy our data into the buffer:
1031
if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers) {
1032
#ifdef XT_XLOG_WAIT_SPINS
1035
xt_cond_wakeall(&xl_write_cond);
1038
xt_cond_wakeall(&xl_write_cond);
1040
goto copy_to_log_buffer;
1044
/* We are just writing the buffer! */
1045
ASSERT_NS(write_reason == WR_NO_SPACE);
1046
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0) {
1047
#ifdef XT_XLOG_WAIT_SPINS
1050
xt_cond_wakeall(&xl_write_cond);
1053
xt_cond_wakeall(&xl_write_cond);
1060
/* If the current write buffer has been written, then
1061
* switch the logs. Otherwise we must try to existing
1064
if (xl_write_done) {
1065
/* This means that the current write buffer has been writen,
1068
xt_spinlock_lock(&xl_buffer_lock);
1069
xtWord1 *tmp_buffer = xl_write_buffer;
1071
/* The write position is now the append position: */
1072
xl_write_log_id = xl_append_log_id;
1073
xl_write_log_offset = xl_append_log_offset;
1074
xl_write_buf_pos = xl_append_buf_pos;
1075
xl_write_buf_pos_start = xl_append_buf_pos_start;
1076
xl_write_buffer = xl_append_buffer;
1077
xl_write_done = FALSE;
1079
/* We have to maintain 512 byte alignment: */
1080
ASSERT_NS((xl_write_log_offset % 512) == 0);
1081
part_size = xl_write_buf_pos % 512;
1083
memcpy(tmp_buffer, xl_write_buffer + xl_write_buf_pos - part_size, part_size);
1085
/* The new append position will be after the
1086
* current append position:
1088
xl_append_log_offset += xl_append_buf_pos - part_size;
1089
xl_append_buf_pos = part_size;
1090
xl_append_buf_pos_start = part_size;
1091
xl_append_buffer = tmp_buffer; // The old write buffer (which is empty)
1094
* If the append offset exceeds the log threshhold, then
1095
* we set the append buffer to a new log file:
1097
* NOTE: This algorithm will cause the log to be overwriten by a maximum
1098
* of the log buffer size!
1100
if (xl_append_log_offset >= xl_log_file_threshold) {
1101
XTXactNewLogEntryDPtr log_tail;
1102
XTXactLogHeaderDPtr log_head;
1106
/* Write the final record to the old log.
1107
* There is enough space for this because we allocate the
1108
* buffer a little bigger than required.
1110
log_tail = (XTXactNewLogEntryDPtr) (xl_write_buffer + xl_write_buf_pos);
1111
log_tail->xl_status_1 = XT_LOG_ENT_NEW_LOG;
1112
log_tail->xl_checksum_1 = XT_CHECKSUM_1(xl_append_log_id) ^ XT_CHECKSUM_1(xl_write_log_id);
1113
XT_SET_DISK_4(log_tail->xl_log_id_4, xl_append_log_id);
1114
xl_write_buf_pos += sizeof(XTXactNewLogEntryDRec);
1116
/* We add the header to the next log. */
1117
log_head = (XTXactLogHeaderDPtr) xl_append_buffer;
1118
memset(log_head, 0, sizeof(XTXactLogHeaderDRec));
1119
log_head->xh_status_1 = XT_LOG_ENT_HEADER;
1120
log_head->xh_checksum_1 = XT_CHECKSUM_1(xl_append_log_id);
1121
XT_SET_DISK_4(log_head->xh_size_4, sizeof(XTXactLogHeaderDRec));
1122
XT_SET_DISK_4(log_head->xh_log_id_4, xl_append_log_id);
1123
XT_SET_DISK_2(log_head->xh_version_2, XT_LOG_VERSION_NO);
1124
XT_SET_DISK_4(log_head->xh_magic_4, XT_LOG_FILE_MAGIC);
1126
xl_append_log_offset = 0;
1127
xl_append_buf_pos = sizeof(XTXactLogHeaderDRec);
1128
xl_append_buf_pos_start = 0;
1130
xt_spinlock_unlock(&xl_buffer_lock);
1131
/* We have completed the switch. The append buffer is empty, and
1132
* other threads can begin to write to it.
1134
* Meanwhile, this thread will write the write buffer...
1138
/* Make sure we have the correct log open: */
1139
if (xl_log_id != xl_write_log_id) {
1140
if (!xlog_open_log(xl_write_log_id, xl_write_log_offset, thread))
1144
/* Write the buffer. */
1145
/* Always write an integral number of 512 byte blocks: */
1146
ASSERT_NS((xl_write_log_offset % 512) == 0);
1147
if ((part_size = xl_write_buf_pos % 512)) {
1148
part_size = 512 - part_size;
1149
xl_write_buffer[xl_write_buf_pos] = XT_LOG_ENT_END_OF_LOG;
1150
#ifdef HAVE_valgrind
1152
memset(xl_write_buffer + xl_write_buf_pos + 1, 0x66, part_size - 1);
1154
if (!xt_pwrite_file(xl_log_file, xl_write_log_offset, xl_write_buf_pos+part_size, xl_write_buffer, &thread->st_statistics.st_xlog, thread))
1158
if (!xt_pwrite_file(xl_log_file, xl_write_log_offset, xl_write_buf_pos, xl_write_buffer, &thread->st_statistics.st_xlog, thread))
1162
/* This part has not been written: */
1163
part_size = xl_write_buf_pos - xl_write_buf_pos_start;
1165
/* We have written the data to the log, transfer
1166
* the buffer data into the cache. */
1167
if (!xlog_transfer_to_cache(xl_log_file, xl_log_id, xl_write_log_offset+xl_write_buf_pos_start, part_size, xl_write_buffer+xl_write_buf_pos_start, thread))
1170
xl_write_done = TRUE;
1171
xl_log_bytes_written += part_size;
1173
if (write_reason == WR_FLUSH) {
1174
if (xl_db->db_co_busy) {
1175
/* [(8)] Flush the compactor log. */
1176
xt_lock_mutex_ns(&xl_db->db_co_dlog_lock);
1177
if (!xl_db->db_co_thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
1178
xl_log_bytes_written -= part_size;
1179
xt_unlock_mutex_ns(&xl_db->db_co_dlog_lock);
1182
xt_unlock_mutex_ns(&xl_db->db_co_dlog_lock);
1185
/* And flush if required: */
1186
flush_time = thread->st_statistics.st_xlog.ts_flush_time;
1187
if (!xt_flush_file(xl_log_file, &thread->st_statistics.st_xlog, thread)) {
1188
xl_log_bytes_written -= part_size;
1191
xl_last_flush_time = (u_int) (thread->st_statistics.st_xlog.ts_flush_time - flush_time);
1193
xl_log_bytes_flushed = xl_log_bytes_written;
1195
xt_lock_mutex_ns(&xl_db->db_wr_lock);
1196
xl_flush_log_id = xl_write_log_id;
1197
xl_flush_log_offset = xl_write_log_offset + xl_write_buf_pos;
1199
* We have written data to the log, wake the writer to commit
1200
* the data to the database.
1202
xlog_wr_log_written(xl_db);
1203
xt_unlock_mutex_ns(&xl_db->db_wr_lock);
1205
/* Check that the require flush condition has arrived. */
1206
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) > 0)
1207
/* The required position is still after the current flush
1208
* position, continue writing: */
1211
#ifdef XT_XLOG_WAIT_SPINS
1214
xt_cond_wakeall(&xl_write_cond);
1217
xt_cond_wakeall(&xl_write_cond);
1219
ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
1223
xlog_wr_log_written(xl_db);
1226
* Check that the buffer is now available, otherwise,
1227
* switch and write again!
1229
if (xl_append_buf_pos + size1 + size2 > xl_size_of_buffers)
1232
#ifdef XT_XLOG_WAIT_SPINS
1235
xt_cond_wakeall(&xl_write_cond);
1238
xt_cond_wakeall(&xl_write_cond);
1247
xt_spinlock_lock(&xl_buffer_lock);
1248
/* Now we have to check again. The check above was a dirty read!
1250
if (xl_append_buf_pos + size1 + size2 > xl_size_of_buffers) {
1251
xt_spinlock_unlock(&xl_buffer_lock);
1252
/* Not enough space, write the buffer, and return here. */
1253
write_reason = WR_NO_SPACE;
1254
goto write_log_to_file;
1257
memcpy(xl_append_buffer + xl_append_buf_pos, data1, size1);
1259
memcpy(xl_append_buffer + xl_append_buf_pos + size1, data2, size2);
1260
/* Add the log ID to the checksum!
1261
* This is required because log files are re-used, and we don't
1262
* want the records to be valid when the log is re-used.
1264
register XTXactLogBufferDPtr record;
1267
* Adjust db_xn_writer_count here. It is protected by
1270
record = (XTXactLogBufferDPtr) (xl_append_buffer + xl_append_buf_pos);
1271
switch (record->xh.xh_status_1) {
1272
case XT_LOG_ENT_HEADER:
1273
case XT_LOG_ENT_END_OF_LOG:
1275
case XT_LOG_ENT_REC_MODIFIED:
1276
case XT_LOG_ENT_UPDATE:
1277
case XT_LOG_ENT_UPDATE_BG:
1278
case XT_LOG_ENT_UPDATE_FL:
1279
case XT_LOG_ENT_UPDATE_FL_BG:
1280
case XT_LOG_ENT_INSERT:
1281
case XT_LOG_ENT_INSERT_BG:
1282
case XT_LOG_ENT_INSERT_FL:
1283
case XT_LOG_ENT_INSERT_FL_BG:
1284
case XT_LOG_ENT_DELETE:
1285
case XT_LOG_ENT_DELETE_BG:
1286
case XT_LOG_ENT_DELETE_FL:
1287
case XT_LOG_ENT_DELETE_FL_BG:
1288
sum = XT_GET_DISK_2(record->xu.xu_checksum_2) ^ XT_CHECKSUM_2(xl_append_log_id);
1289
XT_SET_DISK_2(record->xu.xu_checksum_2, sum);
1291
if (!thread->st_xact_writer) {
1292
thread->st_xact_writer = TRUE;
1293
thread->st_xact_write_time = xt_db_approximate_time;
1294
xl_db->db_xn_writer_count++;
1295
xl_db->db_xn_total_writer_count++;
1298
case XT_LOG_ENT_REC_REMOVED_BI:
1299
case XT_LOG_ENT_REC_REMOVED_BI_L:
1300
sum = XT_GET_DISK_2(record->rb.rb_checksum_2) ^ XT_CHECKSUM_2(xl_append_log_id);
1301
XT_SET_DISK_2(record->rb.rb_checksum_2, sum);
1303
case XT_LOG_ENT_ROW_NEW:
1304
case XT_LOG_ENT_ROW_NEW_FL:
1305
record->xl.xl_checksum_1 ^= XT_CHECKSUM_1(xl_append_log_id);
1307
if (!thread->st_xact_writer) {
1308
thread->st_xact_writer = TRUE;
1309
thread->st_xact_write_time = xt_db_approximate_time;
1310
xl_db->db_xn_writer_count++;
1311
xl_db->db_xn_total_writer_count++;
1314
case XT_LOG_ENT_COMMIT:
1315
case XT_LOG_ENT_ABORT:
1316
ASSERT_NS(thread->st_xact_writer);
1317
ASSERT_NS(xl_db->db_xn_writer_count > 0);
1318
if (thread->st_xact_writer) {
1319
xl_db->db_xn_writer_count--;
1320
thread->st_xact_writer = FALSE;
1321
if (thread->st_xact_long_running) {
1322
xl_db->db_xn_long_running_count--;
1323
thread->st_xact_long_running = FALSE;
1326
/* No break required! */
1328
record->xl.xl_checksum_1 ^= XT_CHECKSUM_1(xl_append_log_id);
1332
ASSERT_NS(xlog_verify(record, size1 + size2, xl_append_log_id));
1335
*log_id = xl_append_log_id;
1337
*log_offset = xl_append_log_offset + xl_append_buf_pos;
1338
xl_append_buf_pos += size1 + size2;
1339
if (flush_log_at_trx_commit != XT_XLOG_NO_WRITE_NO_FLUSH) {
1340
write_reason = flush_log_at_trx_commit == XT_XLOG_WRITE_AND_FLUSH ? WR_FLUSH : WR_NO_SPACE;
1341
req_flush_log_id = xl_append_log_id;
1342
req_flush_log_offset = xl_append_log_offset + xl_append_buf_pos;
1343
xt_spinlock_unlock(&xl_buffer_lock);
1344
/* We have written the data already! */
1347
goto write_log_to_file;
1350
// Failed sometime when outside the spinlock!
1351
ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset + xl_append_buf_pos) <= 0);
1352
xt_spinlock_unlock(&xl_buffer_lock);
1357
#ifdef XT_XLOG_WAIT_SPINS
1360
xt_cond_wakeall(&xl_write_cond);
1363
xt_cond_wakeall(&xl_write_cond);
1369
* This function does not always delete the log. It may just rename a
1370
* log to a new log which it will need.
1371
* This speeds things up:
1373
* - No need to pre-allocate the new log.
1374
* - Log data is already flushed (i.e. disk blocks allocated)
1375
* - Log is already in OS cache.
1377
* However, it means that I need to checksum things differently
1378
* on each log to make sure I do not treat an old record
1381
* Return OK, FAILED or XT_ERR
1383
int XTDatabaseLog::xlog_delete_log(xtLogID del_log_id, XTThreadPtr thread)
1385
char path[PATH_MAX];
1387
if (xl_max_log_id < xl_write_log_id)
1388
xl_max_log_id = xl_write_log_id;
1390
xlog_name(PATH_MAX, path, del_log_id);
1392
if (xt_db_offline_log_function == XT_RECYCLE_LOGS) {
1393
char new_path[PATH_MAX];
1397
/* Make sure that the total logs is less than or equal to the log file count
1398
* (plus dynamic component).
1400
while (xl_max_log_id - del_log_id + 1 <= (xl_log_file_count + xt_log_file_dyn_count) &&
1401
/* And the number of logs after the current log (including the current log)
1402
* must be less or equal to the log file count. */
1403
xl_max_log_id - xl_write_log_id + 1 <= xl_log_file_count) {
1404
new_log_id = xl_max_log_id+1;
1405
xlog_name(PATH_MAX, new_path, new_log_id);
1406
ok = xt_fs_rename(NULL, path, new_path);
1408
xl_max_log_id = new_log_id;
1411
if (!xt_fs_exists(new_path)) {
1412
/* Try again later: */
1413
if (thread->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
1414
XT_FILE_IN_USE(thread->t_exception.e_sys_err))
1419
xl_max_log_id = new_log_id;
1423
if (xt_db_offline_log_function != XT_KEEP_LOGS) {
1424
if (!xt_fs_delete(NULL, path)) {
1425
if (thread->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
1426
XT_FILE_IN_USE(thread->t_exception.e_sys_err))
1437
/* PRIVATE FUNCTIONS */
1438
xtBool XTDatabaseLog::xlog_open_log(xtLogID log_id, off_t curr_write_pos, XTThreadPtr thread)
1440
char log_path[PATH_MAX];
1443
if (xl_log_id == log_id)
1447
if (!xt_flush_file(xl_log_file, &thread->st_statistics.st_xlog, thread))
1449
xt_close_file_ns(xl_log_file);
1454
xlog_name(PATH_MAX, log_path, log_id);
1455
if (!(xl_log_file = xt_open_file_ns(log_path, XT_FT_STANDARD, XT_FS_CREATE | XT_FS_MAKE_PATH, 16*1024*1024)))
1457
/* Allocate space until the required size: */
1458
if (curr_write_pos < xl_log_file_threshold) {
1459
eof = xt_seek_eof_file(NULL, xl_log_file);
1461
/* A new file (bad), we need a greater file count: */
1462
xt_log_file_dyn_count++;
1463
xt_log_file_dyn_dec = 4;
1466
/* An existing file (good): */
1467
if (xt_log_file_dyn_count > 0) {
1468
if (xt_log_file_dyn_dec > 0)
1469
xt_log_file_dyn_dec--;
1471
xt_log_file_dyn_count--;
1474
if (eof < xl_log_file_threshold) {
1478
memset(buffer, 0, 2048);
1480
curr_write_pos = xt_align_offset(curr_write_pos, 512);
1481
#ifdef PREWRITE_LOG_COMPLETELY
1482
while (curr_write_pos < xl_log_file_threshold) {
1484
if ((off_t) tfer > xl_log_file_threshold - curr_write_pos)
1485
tfer = (size_t) (xl_log_file_threshold - curr_write_pos);
1486
if (curr_write_pos == 0)
1487
*buffer = XT_LOG_ENT_END_OF_LOG;
1488
if (!xt_pwrite_file(xl_log_file, curr_write_pos, tfer, buffer, &thread->st_statistics.st_xlog, thread))
1491
curr_write_pos += tfer;
1494
if (curr_write_pos < xl_log_file_threshold) {
1497
if (curr_write_pos < xl_log_file_threshold - 2048)
1498
curr_write_pos = xl_log_file_threshold - 2048;
1499
if ((off_t) tfer > xl_log_file_threshold - curr_write_pos)
1500
tfer = (size_t) (xl_log_file_threshold - curr_write_pos);
1501
if (!xt_pwrite_file(xl_log_file, curr_write_pos, tfer, buffer, &thread->st_statistics.st_xlog, thread))
1506
else if (eof > xl_log_file_threshold + (128 * 1024 * 1024)) {
1507
if (!xt_set_eof_file(NULL, xl_log_file, xl_log_file_threshold))
1515
void XTDatabaseLog::xlog_name(size_t size, char *path, xtLogID log_id)
1519
sprintf(name, "xlog-%lu.xt", (u_long) log_id);
1520
xt_strcpy(size, path, xl_db->db_main_path);
1521
xt_add_system_dir(size, path);
1522
xt_add_dir_char(size, path);
1523
xt_strcat(size, path, name);
1527
* -----------------------------------------------------------------------
1528
* T H R E A D T R A N S A C T I O N B U F F E R
1531
xtPublic xtBool xt_xlog_flush_log(struct XTDatabase *db, XTThreadPtr thread)
1533
return db->db_xlog.xlog_flush(thread);
1536
xtPublic xtBool xt_xlog_log_data(XTThreadPtr thread, size_t size, XTXactLogBufferDPtr log_entry, int flush_log_at_trx_commit)
1538
return thread->st_database->db_xlog.xlog_append(thread, size, (xtWord1 *) log_entry, 0, NULL, flush_log_at_trx_commit, NULL, NULL);
1541
/* Allocate a record from the free list. */
1542
xtPublic xtBool xt_xlog_modify_table(xtTableID tab_id, u_int status, xtOpSeqNo op_seq, xtWord1 new_rec_type, xtRecordID free_rec_id, xtRecordID rec_id, size_t size, xtWord1 *data, XTThreadPtr thread)
1544
XTXactLogBufferDRec log_entry;
1548
XTXactDataPtr xact = NULL;
1549
int flush_log_at_trx_commit = XT_XLOG_NO_WRITE_NO_FLUSH;
1552
case XT_LOG_ENT_REC_MODIFIED:
1553
case XT_LOG_ENT_UPDATE:
1554
case XT_LOG_ENT_INSERT:
1555
case XT_LOG_ENT_DELETE:
1557
XT_SET_DISK_4(log_entry.xu.xu_op_seq_4, op_seq);
1558
XT_SET_DISK_4(log_entry.xu.xu_tab_id_4, tab_id);
1559
XT_SET_DISK_4(log_entry.xu.xu_rec_id_4, rec_id);
1560
XT_SET_DISK_2(log_entry.xu.xu_size_2, size);
1561
len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1);
1562
if (!(thread->st_xact_data->xd_flags & XT_XN_XAC_LOGGED)) {
1565
xact = thread->st_xact_data;
1566
xact->xd_flags |= XT_XN_XAC_LOGGED;
1569
case XT_LOG_ENT_UPDATE_FL:
1570
case XT_LOG_ENT_INSERT_FL:
1571
case XT_LOG_ENT_DELETE_FL:
1573
XT_SET_DISK_4(log_entry.xf.xf_op_seq_4, op_seq);
1574
XT_SET_DISK_4(log_entry.xf.xf_tab_id_4, tab_id);
1575
XT_SET_DISK_4(log_entry.xf.xf_rec_id_4, rec_id);
1576
XT_SET_DISK_2(log_entry.xf.xf_size_2, size);
1577
XT_SET_DISK_4(log_entry.xf.xf_free_rec_id_4, free_rec_id);
1578
sum ^= XT_CHECKSUM4_REC(free_rec_id);
1579
len = offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1);
1580
if (!(thread->st_xact_data->xd_flags & XT_XN_XAC_LOGGED)) {
1583
xact = thread->st_xact_data;
1584
xact->xd_flags |= XT_XN_XAC_LOGGED;
1587
case XT_DEFUNKT_REC_FREED:
1588
case XT_DEFUNKT_REC_REMOVED:
1589
case XT_DEFUNKT_REC_REMOVED_EXT:
1590
ASSERT_NS(size == 1 + XT_XACT_ID_SIZE + sizeof(XTTabRecFreeDRec));
1591
XT_SET_DISK_4(log_entry.fr.fr_op_seq_4, op_seq);
1592
XT_SET_DISK_4(log_entry.fr.fr_tab_id_4, tab_id);
1593
XT_SET_DISK_4(log_entry.fr.fr_rec_id_4, rec_id);
1594
len = offsetof(XTactFreeRecEntryDRec, fr_stat_id_1);
1596
case XT_LOG_ENT_REC_REMOVED_BI:
1598
XT_SET_DISK_4(log_entry.rb.rb_op_seq_4, op_seq);
1599
XT_SET_DISK_4(log_entry.rb.rb_tab_id_4, tab_id);
1600
XT_SET_DISK_4(log_entry.rb.rb_rec_id_4, rec_id);
1601
XT_SET_DISK_2(log_entry.rb.rb_size_2, size);
1602
log_entry.rb.rb_new_rec_type_1 = new_rec_type;
1603
sum ^= XT_CHECKSUM4_REC((xtWord4) new_rec_type);
1604
len = offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1);
1606
case XT_LOG_ENT_REC_REMOVED_BI_L:
1608
XT_SET_DISK_4(log_entry.bl.bl_op_seq_4, op_seq);
1609
XT_SET_DISK_4(log_entry.bl.bl_tab_id_4, tab_id);
1610
XT_SET_DISK_4(log_entry.bl.bl_rec_id_4, rec_id);
1611
XT_SET_DISK_2(log_entry.bl.bl_size_2, size);
1612
XT_SET_DISK_4(log_entry.bl.bl_prev_rec_id_4, free_rec_id);
1613
sum ^= XT_CHECKSUM4_REC(free_rec_id);
1614
log_entry.bl.bl_new_rec_type_1 = new_rec_type;
1615
sum ^= XT_CHECKSUM4_REC((xtWord4) new_rec_type);
1616
len = offsetof(XTactRemoveBILEntryDRec, bl_rec_type_1);
1618
case XT_LOG_ENT_REC_MOVED:
1619
ASSERT_NS(size == 8);
1620
XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
1621
XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
1622
XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
1623
len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
1625
case XT_LOG_ENT_REC_CLEANED:
1626
ASSERT_NS(size == offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE);
1627
XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
1628
XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
1629
XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
1630
len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
1632
case XT_LOG_ENT_REC_CLEANED_1:
1633
ASSERT_NS(size == 1);
1634
XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
1635
XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
1636
XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
1637
len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
1639
case XT_LOG_ENT_REC_UNLINKED:
1640
ASSERT_NS(size == offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE);
1641
XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
1642
XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
1643
XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
1644
len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
1646
case XT_LOG_ENT_ROW_NEW:
1647
ASSERT_NS(size == 0);
1648
XT_SET_DISK_4(log_entry.xa.xa_op_seq_4, op_seq);
1649
XT_SET_DISK_4(log_entry.xa.xa_tab_id_4, tab_id);
1650
XT_SET_DISK_4(log_entry.xa.xa_row_id_4, rec_id);
1651
len = offsetof(XTactRowAddedEntryDRec, xa_row_id_4) + XT_ROW_ID_SIZE;
1653
case XT_LOG_ENT_ROW_NEW_FL:
1654
ASSERT_NS(size == 0);
1655
XT_SET_DISK_4(log_entry.xa.xa_op_seq_4, op_seq);
1656
XT_SET_DISK_4(log_entry.xa.xa_tab_id_4, tab_id);
1657
XT_SET_DISK_4(log_entry.xa.xa_row_id_4, rec_id);
1658
XT_SET_DISK_4(log_entry.xa.xa_free_list_4, free_rec_id);
1659
sum ^= XT_CHECKSUM4_REC(free_rec_id);
1660
len = offsetof(XTactRowAddedEntryDRec, xa_free_list_4) + XT_ROW_ID_SIZE;
1662
case XT_LOG_ENT_ROW_ADD_REC:
1663
case XT_LOG_ENT_ROW_SET:
1664
case XT_LOG_ENT_ROW_FREED:
1665
ASSERT_NS(size == sizeof(XTTabRowRefDRec));
1666
XT_SET_DISK_4(log_entry.wr.wr_op_seq_4, op_seq);
1667
XT_SET_DISK_4(log_entry.wr.wr_tab_id_4, tab_id);
1668
XT_SET_DISK_4(log_entry.wr.wr_row_id_4, rec_id);
1669
len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4);
1671
case XT_LOG_ENT_PREPARE:
1673
XT_SET_DISK_4(log_entry.xp.xp_xact_id_4, op_seq);
1674
log_entry.xp.xp_xa_len_1 = (xtWord1) size;
1675
len = offsetof(XTXactPrepareEntryDRec, xp_xa_data);
1676
flush_log_at_trx_commit = xt_db_flush_log_at_trx_commit;
1684
xtWord1 *dptr = data;
1687
sum ^= op_seq ^ (tab_id << 8) ^ XT_CHECKSUM4_REC(rec_id);
1688
if ((g = sum & 0xF0000000)) {
1689
sum = sum ^ (g >> 24);
1692
for (u_int i=0; i<(u_int) size; i++) {
1693
sum = (sum << 4) + *dptr;
1694
if ((g = sum & 0xF0000000)) {
1695
sum = sum ^ (g >> 24);
1701
log_entry.xh.xh_status_1 = status;
1702
if (check_size == 1) {
1703
log_entry.xh.xh_checksum_1 = XT_CHECKSUM_1(sum);
1708
c = XT_CHECKSUM_2(sum);
1709
XT_SET_DISK_2(log_entry.xu.xu_checksum_2, c);
1711
#ifdef PRINT_TABLE_MODIFICATIONS
1712
xt_print_log_record(0, 0, &log_entry);
1715
return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, flush_log_at_trx_commit, &xact->xd_begin_log, &xact->xd_begin_offset);
1717
return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, flush_log_at_trx_commit, NULL, NULL);
1721
* -----------------------------------------------------------------------
1722
* S E Q U E N T I A L L O G R E A D I N G
1726
* Use the log buffer for sequential reading the log.
1728
xtBool XTDatabaseLog::xlog_seq_init(XTXactSeqReadPtr seq, size_t buffer_size, xtBool load_cache)
1730
seq->xseq_buffer_size = buffer_size;
1731
seq->xseq_load_cache = load_cache;
1733
seq->xseq_log_id = 0;
1734
seq->xseq_log_file = NULL;
1735
seq->xseq_log_eof = 0;
1737
seq->xseq_buf_log_offset = 0;
1738
seq->xseq_buffer_len = 0;
1739
seq->xseq_buffer = (xtWord1 *) xt_malloc_ns(buffer_size);
1741
seq->xseq_rec_log_id = 0;
1742
seq->xseq_rec_log_offset = 0;
1743
seq->xseq_record_len = 0;
1745
return seq->xseq_buffer != NULL;
1748
void XTDatabaseLog::xlog_seq_exit(XTXactSeqReadPtr seq)
1750
xlog_seq_close(seq);
1751
if (seq->xseq_buffer) {
1752
xt_free_ns(seq->xseq_buffer);
1753
seq->xseq_buffer = NULL;
1757
void XTDatabaseLog::xlog_seq_close(XTXactSeqReadPtr seq)
1759
if (seq->xseq_log_file) {
1760
xt_close_file_ns(seq->xseq_log_file);
1761
seq->xseq_log_file = NULL;
1763
seq->xseq_log_id = 0;
1764
seq->xseq_log_eof = 0;
1767
xtBool XTDatabaseLog::xlog_seq_start(XTXactSeqReadPtr seq, xtLogID log_id, xtLogOffset log_offset, xtBool XT_UNUSED(missing_ok))
1769
if (seq->xseq_rec_log_id != log_id) {
1770
seq->xseq_rec_log_id = log_id;
1771
seq->xseq_buf_log_offset = seq->xseq_rec_log_offset;
1772
seq->xseq_buffer_len = 0;
1775
/* Windows version: this will help to switch
1776
* to the new log file.
1777
* Due to reading from the log buffers, this was
1780
if (seq->xseq_log_id != log_id) {
1781
if (seq->xseq_log_file) {
1782
xt_close_file_ns(seq->xseq_log_file);
1783
seq->xseq_log_file = NULL;
1786
seq->xseq_rec_log_offset = log_offset;
1787
seq->xseq_record_len = 0;
1791
size_t XTDatabaseLog::xlog_bytes_to_write()
1794
xtLogOffset log_offset;
1796
xtLogOffset to_log_offset;
1797
size_t byte_count = 0;
1799
log_id = xl_db->db_wr_log_id;
1800
log_offset = xl_db->db_wr_log_offset;
1801
to_log_id = xl_db->db_xlog.xl_flush_log_id;
1802
to_log_offset = xl_db->db_xlog.xl_flush_log_offset;
1804
/* Assume the logs have the threshold: */
1805
if (log_id < to_log_id) {
1806
if (log_offset < xt_db_log_file_threshold)
1807
byte_count = (size_t) (xt_db_log_file_threshold - log_offset);
1811
while (log_id < to_log_id) {
1812
byte_count += (size_t) xt_db_log_file_threshold;
1815
if (log_offset < to_log_offset)
1816
byte_count += (size_t) (to_log_offset - log_offset);
1821
xtBool XTDatabaseLog::xlog_read_from_cache(XTXactSeqReadPtr seq, xtLogID log_id, xtLogOffset log_offset, size_t size, off_t eof, xtWord1 *buffer, size_t *data_read, XTThreadPtr thread)
1823
/* xseq_log_file could be NULL because xseq_log_id is not set
1824
* to zero when xseq_log_file is set to NULL!
1825
* This bug caused a crash in TeamDrive.
1827
if (seq->xseq_log_id != log_id || !seq->xseq_log_file) {
1828
char path[PATH_MAX];
1830
if (seq->xseq_log_file) {
1831
xt_close_file_ns(seq->xseq_log_file);
1832
seq->xseq_log_file = NULL;
1835
xlog_name(PATH_MAX, path, log_id);
1836
if (!xt_open_file_ns(&seq->xseq_log_file, path, XT_FT_STANDARD, XT_FS_MISSING_OK, 16*1024*1024))
1838
if (!seq->xseq_log_file) {
1843
seq->xseq_log_id = log_id;
1844
seq->xseq_log_eof = 0;
1848
if (!seq->xseq_log_eof)
1849
seq->xseq_log_eof = xt_seek_eof_file(NULL, seq->xseq_log_file);
1850
eof = seq->xseq_log_eof;
1853
if (log_offset >= eof) {
1859
if ((off_t) size > eof - log_offset)
1860
size = (size_t) (eof - log_offset);
1864
return xt_xlog_read(seq->xseq_log_file, seq->xseq_log_id, log_offset, size, buffer, seq->xseq_load_cache, thread);
1867
xtBool XTDatabaseLog::xlog_rnd_read(XTXactSeqReadPtr seq, xtLogID log_id, xtLogOffset log_offset, size_t size, xtWord1 *buffer, size_t *data_read, XTThreadPtr thread)
1869
/* Fast track to reading from cache: */
1870
if (log_id < xl_write_log_id)
1871
return xlog_read_from_cache(seq, log_id, log_offset, size, 0, buffer, data_read, thread);
1873
if (log_id == xl_write_log_id && log_offset + (xtLogOffset) size <= xl_write_log_offset)
1874
return xlog_read_from_cache(seq, log_id, log_offset, size, xl_write_log_offset, buffer, data_read, thread);
1876
/* May be in the log write or append buffer: */
1877
xt_lck_slock(&xl_buffer_lock);
1879
if (log_id < xl_write_log_id) {
1880
xt_spinlock_unlock(&xl_buffer_lock);
1881
return xlog_read_from_cache(seq, log_id, log_offset, size, 0, buffer, data_read, thread);
1884
/* Check the write buffer: */
1885
if (log_id == xl_write_log_id) {
1886
if (log_offset + (xtLogOffset) size <= xl_write_log_offset) {
1887
xt_spinlock_unlock(&xl_buffer_lock);
1888
return xlog_read_from_cache(seq, log_id, log_offset, size, xl_write_log_offset, buffer, data_read, thread);
1891
if (log_offset < xl_write_log_offset + (xtLogOffset) xl_write_buf_pos) {
1892
/* Reading partially from the write buffer: */
1893
if (log_offset >= xl_write_log_offset) {
1894
/* Completely in the buffer. */
1895
off_t offset = log_offset - xl_write_log_offset;
1897
if (size > xl_write_buf_pos - offset)
1898
size = (size_t) (xl_write_buf_pos - offset);
1900
memcpy(buffer, xl_write_buffer + offset, size);
1903
goto unlock_and_return;
1906
/* End part in the buffer: */
1909
/* The amount that will be taken from the cache: */
1910
tfer = (size_t) (xl_write_log_offset - log_offset);
1913
if (size > xl_write_buf_pos)
1914
size = xl_write_buf_pos;
1916
memcpy(buffer + tfer, xl_write_buffer, size);
1918
xt_spinlock_unlock(&xl_buffer_lock);
1920
/* Read the first part from the cache: */
1922
*data_read = tfer + size;
1923
return xlog_read_from_cache(seq, log_id, log_offset, tfer, log_offset + tfer, buffer, NULL, thread);
1927
/* Check the append buffer: */
1928
if (log_id == xl_append_log_id) {
1929
if (log_offset >= xl_append_log_offset && log_offset < xl_append_log_offset + (xtLogOffset) xl_append_buf_pos) {
1930
/* It is in the append buffer: */
1931
size_t offset = (size_t) (log_offset - xl_append_log_offset);
1933
if (size > xl_append_buf_pos - offset)
1934
size = xl_append_buf_pos - offset;
1936
memcpy(buffer, xl_append_buffer + offset, size);
1939
goto unlock_and_return;
1943
if (xl_append_log_id == 0) {
1944
/* This catches the case that
1945
* the log has not yet been initialized
1948
xt_spinlock_unlock(&xl_buffer_lock);
1949
return xlog_read_from_cache(seq, log_id, log_offset, size, 0, buffer, data_read, thread);
1956
xt_spinlock_unlock(&xl_buffer_lock);
1960
xtBool XTDatabaseLog::xlog_write_thru(XTXactSeqReadPtr seq, size_t size, xtWord1 *data, XTThreadPtr thread)
1962
if (!xt_xlog_write(seq->xseq_log_file, seq->xseq_log_id, seq->xseq_rec_log_offset, size, data, thread))
1964
xl_log_bytes_written += size;
1965
seq->xseq_rec_log_offset += size;
1969
xtBool XTDatabaseLog::xlog_verify(XTXactLogBufferDPtr record, size_t rec_size, xtLogID log_id)
1974
xtRecordID rec_id, free_rec_id;
1979
switch (record->xh.xh_status_1) {
1980
case XT_LOG_ENT_HEADER:
1981
if (record->xh.xh_checksum_1 != XT_CHECKSUM_1(log_id))
1983
if (XT_LOG_HEAD_MAGIC(record, rec_size) != XT_LOG_FILE_MAGIC)
1985
if (rec_size >= offsetof(XTXactLogHeaderDRec, xh_log_id_4) + 4) {
1986
if (XT_GET_DISK_4(record->xh.xh_log_id_4) != log_id)
1990
case XT_LOG_ENT_NEW_LOG:
1991
case XT_LOG_ENT_DEL_LOG:
1992
return record->xl.xl_checksum_1 == (XT_CHECKSUM_1(XT_GET_DISK_4(record->xl.xl_log_id_4)) ^ XT_CHECKSUM_1(log_id));
1993
case XT_LOG_ENT_NEW_TAB:
1994
return record->xl.xl_checksum_1 == (XT_CHECKSUM_1(XT_GET_DISK_4(record->xt.xt_tab_id_4)) ^ XT_CHECKSUM_1(log_id));
1995
case XT_LOG_ENT_COMMIT:
1996
case XT_LOG_ENT_ABORT:
1997
sum = XT_CHECKSUM4_XACT(XT_GET_DISK_4(record->xe.xe_xact_id_4)) ^ XT_CHECKSUM4_XACT(XT_GET_DISK_4(record->xe.xe_not_used_4));
1998
return record->xe.xe_checksum_1 == (XT_CHECKSUM_1(sum) ^ XT_CHECKSUM_1(log_id));
1999
case XT_LOG_ENT_CLEANUP:
2000
sum = XT_CHECKSUM4_XACT(XT_GET_DISK_4(record->xc.xc_xact_id_4));
2001
return record->xc.xc_checksum_1 == (XT_CHECKSUM_1(sum) ^ XT_CHECKSUM_1(log_id));
2002
case XT_LOG_ENT_REC_MODIFIED:
2003
case XT_LOG_ENT_UPDATE:
2004
case XT_LOG_ENT_INSERT:
2005
case XT_LOG_ENT_DELETE:
2006
case XT_LOG_ENT_UPDATE_BG:
2007
case XT_LOG_ENT_INSERT_BG:
2008
case XT_LOG_ENT_DELETE_BG:
2010
op_seq = XT_GET_DISK_4(record->xu.xu_op_seq_4);
2011
tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
2012
rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
2013
dptr = &record->xu.xu_rec_type_1;
2014
rec_size -= offsetof(XTactUpdateEntryDRec, xu_rec_type_1);
2016
case XT_LOG_ENT_UPDATE_FL:
2017
case XT_LOG_ENT_INSERT_FL:
2018
case XT_LOG_ENT_DELETE_FL:
2019
case XT_LOG_ENT_UPDATE_FL_BG:
2020
case XT_LOG_ENT_INSERT_FL_BG:
2021
case XT_LOG_ENT_DELETE_FL_BG:
2023
op_seq = XT_GET_DISK_4(record->xf.xf_op_seq_4);
2024
tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
2025
rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
2026
free_rec_id = XT_GET_DISK_4(record->xf.xf_free_rec_id_4);
2027
sum ^= XT_CHECKSUM4_REC(free_rec_id);
2028
dptr = &record->xf.xf_rec_type_1;
2029
rec_size -= offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1);
2031
case XT_DEFUNKT_REC_FREED:
2032
case XT_DEFUNKT_REC_REMOVED:
2033
case XT_DEFUNKT_REC_REMOVED_EXT:
2034
op_seq = XT_GET_DISK_4(record->fr.fr_op_seq_4);
2035
tab_id = XT_GET_DISK_4(record->fr.fr_tab_id_4);
2036
rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
2037
dptr = &record->fr.fr_stat_id_1;
2038
rec_size -= offsetof(XTactFreeRecEntryDRec, fr_stat_id_1);
2040
case XT_LOG_ENT_REC_REMOVED_BI:
2042
op_seq = XT_GET_DISK_4(record->rb.rb_op_seq_4);
2043
tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
2044
rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
2045
free_rec_id = (xtWord4) record->rb.rb_new_rec_type_1;
2046
sum ^= XT_CHECKSUM4_REC(free_rec_id);
2047
dptr = &record->rb.rb_rec_type_1;
2048
rec_size -= offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1);
2050
case XT_LOG_ENT_REC_REMOVED_BI_L:
2052
op_seq = XT_GET_DISK_4(record->bl.bl_op_seq_4);
2053
tab_id = XT_GET_DISK_4(record->bl.bl_tab_id_4);
2054
rec_id = XT_GET_DISK_4(record->bl.bl_rec_id_4);
2055
free_rec_id = XT_GET_DISK_4(record->bl.bl_prev_rec_id_4);
2056
sum ^= XT_CHECKSUM4_REC(free_rec_id);
2057
free_rec_id = (xtWord4) record->bl.bl_new_rec_type_1;
2058
sum ^= XT_CHECKSUM4_REC(free_rec_id);
2059
dptr = &record->bl.bl_rec_type_1;
2060
rec_size -= offsetof(XTactRemoveBILEntryDRec, bl_rec_type_1);
2062
case XT_LOG_ENT_REC_MOVED:
2063
case XT_LOG_ENT_REC_CLEANED:
2064
case XT_LOG_ENT_REC_CLEANED_1:
2065
case XT_LOG_ENT_REC_UNLINKED:
2066
op_seq = XT_GET_DISK_4(record->xw.xw_op_seq_4);
2067
tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
2068
rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
2069
dptr = &record->xw.xw_rec_type_1;
2070
rec_size -= offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
2072
case XT_LOG_ENT_ROW_NEW:
2073
case XT_LOG_ENT_ROW_NEW_FL:
2074
op_seq = XT_GET_DISK_4(record->xa.xa_op_seq_4);
2075
tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
2076
rec_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
2077
if (record->xh.xh_status_1 == XT_LOG_ENT_ROW_NEW) {
2078
dptr = (xtWord1 *) record + offsetof(XTactRowAddedEntryDRec, xa_free_list_4);
2079
rec_size -= offsetof(XTactRowAddedEntryDRec, xa_free_list_4);
2082
free_rec_id = XT_GET_DISK_4(record->xa.xa_free_list_4);
2083
sum ^= XT_CHECKSUM4_REC(free_rec_id);
2084
dptr = (xtWord1 *) record + sizeof(XTactRowAddedEntryDRec);
2085
rec_size -= sizeof(XTactRowAddedEntryDRec);
2088
case XT_LOG_ENT_ROW_ADD_REC:
2089
case XT_LOG_ENT_ROW_SET:
2090
case XT_LOG_ENT_ROW_FREED:
2091
op_seq = XT_GET_DISK_4(record->wr.wr_op_seq_4);
2092
tab_id = XT_GET_DISK_4(record->wr.wr_tab_id_4);
2093
rec_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
2094
dptr = (xtWord1 *) &record->wr.wr_ref_id_4;
2095
rec_size -= offsetof(XTactWriteRowEntryDRec, wr_ref_id_4);
2097
case XT_LOG_ENT_OP_SYNC:
2098
return record->xl.xl_checksum_1 == (XT_CHECKSUM_1(XT_GET_DISK_4(record->os.os_time_4)) ^ XT_CHECKSUM_1(log_id));
2099
case XT_LOG_ENT_NO_OP:
2100
sum = XT_GET_DISK_4(record->no.no_tab_id_4) ^ XT_GET_DISK_4(record->no.no_op_seq_4);
2101
return record->xe.xe_checksum_1 == (XT_CHECKSUM_1(sum) ^ XT_CHECKSUM_1(log_id));
2102
case XT_LOG_ENT_END_OF_LOG:
2104
case XT_LOG_ENT_PREPARE:
2106
op_seq = XT_GET_DISK_4(record->xp.xp_xact_id_4);
2109
dptr = record->xp.xp_xa_data;
2110
rec_size -= offsetof(XTXactPrepareEntryDRec, xp_xa_data);
2117
sum ^= (xtWord4) op_seq ^ ((xtWord4) tab_id << 8) ^ XT_CHECKSUM4_REC(rec_id);
2119
if ((g = sum & 0xF0000000)) {
2120
sum = sum ^ (g >> 24);
2123
for (u_int i=0; i<(u_int) rec_size; i++) {
2124
sum = (sum << 4) + *dptr;
2125
if ((g = sum & 0xF0000000)) {
2126
sum = sum ^ (g >> 24);
2132
if (check_size == 1) {
2133
if (record->xh.xh_checksum_1 != (XT_CHECKSUM_1(sum) ^ XT_CHECKSUM_1(log_id))) {
2138
if (XT_GET_DISK_2(record->xu.xu_checksum_2) != (XT_CHECKSUM_2(sum) ^ XT_CHECKSUM_2(log_id))) {
2145
xtBool XTDatabaseLog::xlog_seq_next(XTXactSeqReadPtr seq, XTXactLogBufferDPtr *ret_entry, xtBool verify, XTThreadPtr thread)
2147
XTXactLogBufferDPtr record;
2153
u_int check_size = 1;
2155
/* Go to the next record (xseq_record_len must be initialized
2156
* to 0 for this to work.
2158
seq->xseq_rec_log_offset += seq->xseq_record_len;
2159
seq->xseq_record_len = 0;
2161
if (seq->xseq_rec_log_offset < seq->xseq_buf_log_offset ||
2162
seq->xseq_rec_log_offset >= seq->xseq_buf_log_offset + (xtLogOffset) seq->xseq_buffer_len) {
2163
/* The current position is nowhere near the buffer, read data into the
2166
tfer = seq->xseq_buffer_size;
2167
if (!xlog_rnd_read(seq, seq->xseq_rec_log_id, seq->xseq_rec_log_offset, tfer, seq->xseq_buffer, &tfer, thread))
2169
seq->xseq_buf_log_offset = seq->xseq_rec_log_offset;
2170
seq->xseq_buffer_len = tfer;
2172
/* Should we go to the next log? */
2178
/* The start of the record is in the buffer: */
2180
rec_offset = (size_t) (seq->xseq_rec_log_offset - seq->xseq_buf_log_offset);
2181
max_rec_len = seq->xseq_buffer_len - rec_offset;
2184
/* Check the type of record: */
2185
record = (XTXactLogBufferDPtr) (seq->xseq_buffer + rec_offset);
2186
switch (record->xh.xh_status_1) {
2187
case XT_LOG_ENT_HEADER:
2188
len = sizeof(XTXactLogHeaderDRec);
2190
case XT_LOG_ENT_NEW_LOG:
2191
case XT_LOG_ENT_DEL_LOG:
2192
len = sizeof(XTXactNewLogEntryDRec);
2194
case XT_LOG_ENT_NEW_TAB:
2195
len = sizeof(XTXactNewTabEntryDRec);
2197
case XT_LOG_ENT_COMMIT:
2198
case XT_LOG_ENT_ABORT:
2199
len = sizeof(XTXactEndEntryDRec);
2201
case XT_LOG_ENT_CLEANUP:
2202
len = sizeof(XTXactCleanupEntryDRec);
2204
case XT_LOG_ENT_REC_MODIFIED:
2205
case XT_LOG_ENT_UPDATE:
2206
case XT_LOG_ENT_INSERT:
2207
case XT_LOG_ENT_DELETE:
2208
case XT_LOG_ENT_UPDATE_BG:
2209
case XT_LOG_ENT_INSERT_BG:
2210
case XT_LOG_ENT_DELETE_BG:
2212
len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1);
2213
if (len > max_rec_len)
2214
/* The size is not in the buffer: */
2216
len += (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
2218
case XT_LOG_ENT_UPDATE_FL:
2219
case XT_LOG_ENT_INSERT_FL:
2220
case XT_LOG_ENT_DELETE_FL:
2221
case XT_LOG_ENT_UPDATE_FL_BG:
2222
case XT_LOG_ENT_INSERT_FL_BG:
2223
case XT_LOG_ENT_DELETE_FL_BG:
2225
len = offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1);
2226
if (len > max_rec_len)
2227
/* The size is not in the buffer: */
2229
len += (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
2231
case XT_DEFUNKT_REC_FREED:
2232
case XT_DEFUNKT_REC_REMOVED:
2233
case XT_DEFUNKT_REC_REMOVED_EXT:
2234
/* [(7)] REMOVE is now a extended version of FREE! */
2235
len = offsetof(XTactFreeRecEntryDRec, fr_rec_type_1) + sizeof(XTTabRecFreeDRec);
2237
case XT_LOG_ENT_REC_REMOVED_BI:
2239
len = offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1);
2240
if (len > max_rec_len)
2241
/* The size is not in the buffer: */
2243
len += (size_t) XT_GET_DISK_2(record->rb.rb_size_2);
2245
case XT_LOG_ENT_REC_REMOVED_BI_L:
2247
len = offsetof(XTactRemoveBILEntryDRec, bl_rec_type_1);
2248
if (len > max_rec_len)
2249
/* The size is not in the buffer: */
2251
len += (size_t) XT_GET_DISK_2(record->bl.bl_size_2);
2253
case XT_LOG_ENT_REC_MOVED:
2254
len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 8;
2256
case XT_LOG_ENT_REC_CLEANED:
2257
len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
2259
case XT_LOG_ENT_REC_CLEANED_1:
2260
len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 1;
2262
case XT_LOG_ENT_REC_UNLINKED:
2263
len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
2265
case XT_LOG_ENT_ROW_NEW:
2266
len = offsetof(XTactRowAddedEntryDRec, xa_row_id_4) + XT_ROW_ID_SIZE;
2268
case XT_LOG_ENT_ROW_NEW_FL:
2269
len = offsetof(XTactRowAddedEntryDRec, xa_free_list_4) + XT_ROW_ID_SIZE;
2271
case XT_LOG_ENT_ROW_ADD_REC:
2272
case XT_LOG_ENT_ROW_SET:
2273
case XT_LOG_ENT_ROW_FREED:
2274
len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4) + XT_REF_ID_SIZE;
2276
case XT_LOG_ENT_OP_SYNC:
2277
len = sizeof(XTactOpSyncEntryDRec);
2279
case XT_LOG_ENT_NO_OP:
2280
len = sizeof(XTactNoOpEntryDRec);
2282
case XT_LOG_ENT_END_OF_LOG: {
2283
off_t eof = seq->xseq_log_eof, adjust;
2285
if (eof > seq->xseq_rec_log_offset) {
2286
adjust = eof - seq->xseq_rec_log_offset;
2288
seq->xseq_record_len = (size_t) adjust;
2292
case XT_LOG_ENT_PREPARE:
2294
len = offsetof(XTXactPrepareEntryDRec, xp_xa_data);
2295
if (len > max_rec_len)
2296
/* The size is not in the buffer: */
2298
len += (size_t) record->xp.xp_xa_len_1;
2301
/* It is possible to land here after a crash, if the
2302
* log was not completely written.
2304
seq->xseq_record_len = 0;
2308
ASSERT_NS(len <= seq->xseq_buffer_size);
2309
if (len <= max_rec_len) {
2311
if (!xlog_verify(record, len, seq->xseq_rec_log_id)) {
2316
/* The record is completely in the buffer: */
2317
seq->xseq_record_len = len;
2318
*ret_entry = record;
2322
/* The record is partially in the buffer. */
2323
memmove(seq->xseq_buffer, seq->xseq_buffer + rec_offset, max_rec_len);
2324
seq->xseq_buf_log_offset += rec_offset;
2325
seq->xseq_buffer_len = max_rec_len;
2327
/* Read the rest, as far as possible: */
2328
tfer = seq->xseq_buffer_size - max_rec_len;
2329
if (!xlog_rnd_read(seq, seq->xseq_rec_log_id, seq->xseq_buf_log_offset + max_rec_len, tfer, seq->xseq_buffer + max_rec_len, &tfer, thread))
2331
seq->xseq_buffer_len += tfer;
2333
if (seq->xseq_buffer_len < len) {
2334
/* A partial record is in the log, must be the end of the log: */
2338
/* The record is now completely in the buffer: */
2339
seq->xseq_record_len = len;
2340
*ret_entry = (XTXactLogBufferDPtr) seq->xseq_buffer;
2344
ASSERT_NS(len <= seq->xseq_buffer_size);
2345
memmove(seq->xseq_buffer, seq->xseq_buffer + rec_offset, max_rec_len);
2346
seq->xseq_buf_log_offset += rec_offset;
2347
seq->xseq_buffer_len = max_rec_len;
2349
/* Read the rest, as far as possible: */
2350
tfer = seq->xseq_buffer_size - max_rec_len;
2351
if (!xlog_rnd_read(seq, seq->xseq_rec_log_id, seq->xseq_buf_log_offset + max_rec_len, tfer, seq->xseq_buffer + max_rec_len, &tfer, thread))
2353
seq->xseq_buffer_len += tfer;
2355
if (seq->xseq_buffer_len < len + size) {
2356
/* We did not get as much as we need, return an empty record: */
2360
goto read_from_buffer;
2367
void XTDatabaseLog::xlog_seq_skip(XTXactSeqReadPtr seq, size_t size)
2369
seq->xseq_record_len += size;
2372
/* ----------------------------------------------------------------------
2373
* W R I T E R P R O C E S S
2377
* The log has been written. Wake the writer to commit the
2378
* data to disk, if the transaction log cache is full.
2380
* Data may not be written to the database until it has been
2381
* flushed to the log.
2383
* This is because there is no way to undo changes to the
2386
* However, I have dicovered that writing constantly in the
2387
* background can disturb the I/O in the foreground.
2389
* So we can delay the writing of the database. But we should
2390
* not delay it longer than we have transaction log cache.
2392
* If so, the data that we need will fall out of the cache
2393
* and we will have to read it again.
2395
static void xlog_wr_log_written(XTDatabaseHPtr db)
2397
if (db->db_wr_idle) {
2398
xtWord8 cached_bytes;
2400
/* Determine if the cached log data is about to fall out of the cache. */
2401
cached_bytes = db->db_xlog.xl_log_bytes_written - db->db_xlog.xl_log_bytes_read;
2402
/* The limit is 75%: */
2403
if (cached_bytes >= xt_xlog_cache.xlc_upper_limit) {
2404
if (!xt_broadcast_cond_ns(&db->db_wr_cond))
2405
xt_log_and_clear_exception_ns();
2410
#define XT_MORE_TO_WRITE 1
2411
#define XT_FREER_WAITING 2
2412
#define XT_NO_ACTIVITY 3
2413
#define XT_LOG_CACHE_FULL 4
2414
#define XT_CHECKPOINT_REQ 5
2415
#define XT_THREAD_WAITING 6
2416
#define XT_TIME_TO_WRITE 7
2419
* Wait for a reason to write the data from the log to the database.
2420
* This can be one of the following:
2423
static int xlog_wr_wait_for_write_condition(XTThreadPtr self, XTDatabaseHPtr db, int old_reason)
2425
xtXactID last_xn_id;
2426
xtWord8 cached_bytes;
2427
int reason = XT_MORE_TO_WRITE;
2429
#ifdef TRACE_WRITER_ACTIVITY
2430
printf("WRITER --- DONE\n");
2433
xt_lock_mutex(self, &db->db_wr_lock);
2434
pushr_(xt_unlock_mutex, &db->db_wr_lock);
2437
* Wake the freeer if it is waiting for this writer, before
2440
if (db->db_wr_freeer_waiting) {
2441
if (!xt_broadcast_cond_ns(&db->db_wr_cond))
2442
xt_log_and_clear_exception_ns();
2445
if (db->db_wr_flush_point_log_id == db->db_xlog.xl_flush_log_id &&
2446
db->db_wr_flush_point_log_offset == db->db_xlog.xl_flush_log_offset) {
2447
/* Wake the checkpointer to flush the indexes:
2448
* PMC 15.05.2008 - Not doing this anymore!
2450
* PMC 1.07.2009 - Added this notification again, if the
2451
* reason why we wrote is because the checkpointer may have
2454
if (old_reason == XT_CHECKPOINT_REQ)
2455
xt_wake_checkpointer(self, db);
2457
/* Sleep as long as the flush point has not changed, from the last
2458
* target flush point.
2460
while (!self->t_quit &&
2461
db->db_wr_flush_point_log_id == db->db_xlog.xl_flush_log_id &&
2462
db->db_wr_flush_point_log_offset == db->db_xlog.xl_flush_log_offset &&
2463
reason != XT_LOG_CACHE_FULL &&
2464
reason != XT_TIME_TO_WRITE &&
2465
reason != XT_CHECKPOINT_REQ) {
2468
* Sleep as long as there is no reason to write any more...
2470
while (!self->t_quit) {
2471
last_xn_id = db->db_xn_curr_id;
2472
db->db_wr_idle = XT_THREAD_IDLE;
2473
xt_timed_wait_cond(self, &db->db_wr_cond, &db->db_wr_lock, 500);
2474
db->db_wr_idle = XT_THREAD_BUSY;
2475
/* These are the reasons for doing work: */
2476
/* The free'er thread is waiting for the writer: */
2477
if (db->db_wr_freeer_waiting) {
2478
reason = XT_FREER_WAITING;
2481
/* Some thread is waiting for the writer: */
2482
if (db->db_wr_thread_waiting) {
2483
reason = XT_THREAD_WAITING;
2486
/* Check if the cache will soon overflow... */
2487
ASSERT(db->db_xlog.xl_log_bytes_written >= db->db_xlog.xl_log_bytes_read);
2488
ASSERT(db->db_xlog.xl_log_bytes_written >= db->db_xlog.xl_log_bytes_flushed);
2490
ASSERT(db->db_xlog.xl_log_bytes_written < db->db_xlog.xl_log_bytes_read + 500000000);
2491
/* This is the amount of data still to be written: */
2492
cached_bytes = db->db_xlog.xl_log_bytes_written - db->db_xlog.xl_log_bytes_read;
2493
/* The limit is 75%: */
2494
if (cached_bytes >= xt_xlog_cache.xlc_upper_limit) {
2495
reason = XT_LOG_CACHE_FULL;
2499
/* Create a system variable which specifies the write frequency. */
2500
if (xt_db_record_write_threshold && cached_bytes >= xt_db_record_write_threshold) {
2501
reason = XT_TIME_TO_WRITE;
2505
/* Check if we are holding up a checkpoint: */
2506
if (db->db_restart.xres_cp_required ||
2507
db->db_restart.xres_is_checkpoint_pending(db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset)) {
2508
/* Enough data has been written for a checkpoint: */
2509
if (!db->db_restart.xres_is_checkpoint_pending(db->db_wr_log_id, db->db_wr_log_offset)) {
2510
/* But not enough data has been written for a checkpoint: */
2511
reason = XT_CHECKPOINT_REQ;
2515
/* There is no activity, if the current ID has not changed during
2516
* the wait, and the sweeper has nothing to do, and the checkpointer.
2518
if (db->db_xn_curr_id == last_xn_id &&
2519
/* Changed xt_xn_get_curr_id(db) to db->db_xn_curr_id,
2520
* This should work because we are not concerned about the difference
2521
* between xt_xn_get_curr_id(db) and db->db_xn_curr_id,
2522
* Which is just a matter of when transactions we can expect ot find
2523
* in memory (see {GAP-INC-ADD-XACT})
2525
xt_xn_is_before(db->db_xn_curr_id, db->db_xn_to_clean_id) && // db->db_xn_curr_id < db->db_xn_to_clean_id
2526
!db->db_restart.xres_is_checkpoint_pending(db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset)) {
2527
/* There seems to be no activity at the moment.
2528
* this might be a good time to write the log data.
2530
reason = XT_NO_ACTIVITY;
2536
freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2538
if (reason == XT_LOG_CACHE_FULL || reason == XT_TIME_TO_WRITE || reason == XT_CHECKPOINT_REQ) {
2539
/* Make sure that we have something to write: */
2540
if (db->db_xlog.xlog_bytes_to_write() < 2 * 1204 * 1024)
2541
xt_xlog_flush_log(db, self);
2544
#ifdef TRACE_WRITER_ACTIVITY
2546
case XT_MORE_TO_WRITE: printf("WRITER --- still more to write...\n"); break;
2547
case XT_FREER_WAITING: printf("WRITER --- free'er waiting for writer...\n"); break;
2548
case XT_NO_ACTIVITY: printf("WRITER --- no activity...\n"); break;
2549
case XT_LOG_CACHE_FULL: printf("WRITER --- running out of log cache...\n"); break;
2550
case XT_CHECKPOINT_REQ: printf("WRITER --- enough flushed for a checkpoint...\n"); break;
2551
case XT_THREAD_WAITING: printf("WRITER --- thread waiting for writer...\n"); break;
2552
case XT_TIME_TO_WRITE: printf("WRITER --- limit of 12MB reached, time to write...\n"); break;
2559
static void xlog_wr_could_go_faster(XTThreadPtr self, XTDatabaseHPtr db)
2561
if (db->db_wr_faster) {
2562
if (!db->db_wr_fast) {
2563
xt_set_normal_priority(self);
2564
db->db_wr_fast = TRUE;
2566
db->db_wr_faster = FALSE;
2570
static void xlog_wr_could_go_slower(XTThreadPtr self, XTDatabaseHPtr db)
2572
if (db->db_wr_fast && !db->db_wr_faster) {
2573
xt_set_low_priority(self);
2574
db->db_wr_fast = FALSE;
2578
static void xlog_wr_main(XTThreadPtr self)
2580
XTDatabaseHPtr db = self->st_database;
2581
XTWriterStatePtr ws;
2582
XTXactLogBufferDPtr record;
2583
int reason = XT_NO_ACTIVITY;
2585
xt_set_low_priority(self);
2587
alloczr_(ws, xt_free_writer_state, sizeof(XTWriterStateRec), XTWriterStatePtr);
2589
ws->ws_in_recover = FALSE;
2591
if (!db->db_xlog.xlog_seq_init(&ws->ws_seqread, xt_db_log_buffer_size, FALSE))
2594
if (!db->db_xlog.xlog_seq_start(&ws->ws_seqread, db->db_wr_log_id, db->db_wr_log_offset, FALSE))
2597
while (!self->t_quit) {
2598
while (!self->t_quit) {
2599
/* Determine the point to which we can write.
2600
* This is the current log flush point!
2602
xt_lock_mutex_ns(&db->db_wr_lock);
2603
db->db_wr_flush_point_log_id = db->db_xlog.xl_flush_log_id;
2604
db->db_wr_flush_point_log_offset = db->db_xlog.xl_flush_log_offset;
2605
xt_unlock_mutex_ns(&db->db_wr_lock);
2607
if (xt_comp_log_pos(db->db_wr_log_id, db->db_wr_log_offset, db->db_wr_flush_point_log_id, db->db_wr_flush_point_log_offset) >= 0) {
2611
while (!self->t_quit) {
2612
xlog_wr_could_go_faster(self, db);
2614
/* This is the restart position: */
2615
xt_lock_mutex(self, &db->db_wr_lock);
2616
pushr_(xt_unlock_mutex, &db->db_wr_lock);
2617
db->db_wr_log_id = ws->ws_seqread.xseq_rec_log_id;
2618
db->db_wr_log_offset = ws->ws_seqread.xseq_rec_log_offset + ws->ws_seqread.xseq_record_len;
2619
freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2621
if (xt_comp_log_pos(db->db_wr_log_id, db->db_wr_log_offset, db->db_wr_flush_point_log_id, db->db_wr_flush_point_log_offset) >= 0) {
2625
/* Apply all changes that have been flushed to the log, to the
2628
if (!db->db_xlog.xlog_seq_next(&ws->ws_seqread, &record, FALSE, self))
2633
switch (record->xl.xl_status_1) {
2634
case XT_LOG_ENT_HEADER:
2636
case XT_LOG_ENT_NEW_LOG:
2637
if (!db->db_xlog.xlog_seq_start(&ws->ws_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, TRUE))
2640
case XT_LOG_ENT_NEW_TAB:
2641
case XT_LOG_ENT_COMMIT:
2642
case XT_LOG_ENT_ABORT:
2643
case XT_LOG_ENT_CLEANUP:
2644
case XT_LOG_ENT_OP_SYNC:
2645
case XT_LOG_ENT_PREPARE:
2647
case XT_LOG_ENT_DEL_LOG:
2650
log_id = XT_GET_DISK_4(record->xl.xl_log_id_4);
2651
xt_dl_set_to_delete(self, db, log_id);
2654
xt_xres_apply_in_order(self, ws, ws->ws_seqread.xseq_rec_log_id, ws->ws_seqread.xseq_rec_log_offset, record);
2657
/* Count the number of bytes read from the log: */
2658
db->db_xlog.xl_log_bytes_read += ws->ws_seqread.xseq_record_len;
2662
#ifdef XT_SORT_REC_WRITES
2663
/* Flush because the freeer may be waiting! */
2664
xt_xres_flush_all(self, ws);
2668
xt_db_return_table_to_pool(self, ws->ws_ot);
2672
xlog_wr_could_go_slower(self, db);
2674
/* Note, we delay writing the database for a maximum of
2677
reason = xlog_wr_wait_for_write_condition(self, db, reason);
2680
freer_(); // xt_free_writer_state(ss)
2683
static void *xlog_wr_run_thread(XTThreadPtr self)
2685
XTDatabaseHPtr db = (XTDatabaseHPtr) self->t_data;
2689
mysql_thread = myxt_create_thread();
2691
while (!self->t_quit) {
2694
* The garbage collector requires that the database
2695
* is in use because.
2697
xt_use_database(self, db, XT_FOR_WRITER);
2699
/* This action is both safe and required (see {BACKGROUND-RELEASE-DB}) */
2700
xt_heap_release(self, self->st_database);
2705
/* This error is "normal"! */
2706
if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
2707
!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
2708
self->t_exception.e_sys_err == SIGTERM))
2709
xt_log_and_clear_exception(self);
2713
/* Avoid releasing the database (done above) */
2714
self->st_database = NULL;
2715
xt_unuse_database(self, self);
2717
/* After an exception, pause before trying again... */
2718
/* Number of seconds */
2724
db->db_wr_idle = XT_THREAD_INERR;
2725
while (!self->t_quit && count > 0) {
2729
db->db_wr_idle = XT_THREAD_BUSY;
2733
* {MYSQL-THREAD-KILL}
2734
myxt_destroy_thread(mysql_thread, TRUE);
2739
static void xlog_wr_free_thread(XTThreadPtr self, void *data)
2741
XTDatabaseHPtr db = (XTDatabaseHPtr) data;
2743
if (db->db_wr_thread) {
2744
xt_lock_mutex(self, &db->db_wr_lock);
2745
pushr_(xt_unlock_mutex, &db->db_wr_lock);
2746
db->db_wr_thread = NULL;
2747
freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2751
xtPublic void xt_start_writer(XTThreadPtr self, XTDatabaseHPtr db)
2753
char name[PATH_MAX];
2755
sprintf(name, "WR-%s", xt_last_directory_of_path(db->db_main_path));
2756
xt_remove_dir_char(name);
2757
db->db_wr_thread = xt_create_daemon(self, name);
2758
xt_set_thread_data(db->db_wr_thread, db, xlog_wr_free_thread);
2759
xt_run_thread(self, db->db_wr_thread, xlog_wr_run_thread);
2763
* This function is called on database shutdown.
2764
* We will wait a certain amounnt of time for the writer to
2765
* complete its work.
2766
* If it takes to long we will abort!
2768
xtPublic void xt_wait_for_writer(XTThreadPtr self, XTDatabaseHPtr db)
2771
xtBool message = FALSE;
2773
if (db->db_wr_thread) {
2775
while (xt_comp_log_pos(db->db_wr_log_id, db->db_wr_log_offset, db->db_wr_flush_point_log_id, db->db_wr_flush_point_log_offset) < 0) {
2777
xt_lock_mutex(self, &db->db_wr_lock);
2778
pushr_(xt_unlock_mutex, &db->db_wr_lock);
2779
db->db_wr_thread_waiting++;
2780
/* Wake the writer so that it con complete its work. */
2781
if (db->db_wr_idle) {
2782
if (!xt_broadcast_cond_ns(&db->db_wr_cond))
2783
xt_log_and_clear_exception_ns();
2785
freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2787
xt_sleep_milli_second(10);
2789
xt_lock_mutex(self, &db->db_wr_lock);
2790
pushr_(xt_unlock_mutex, &db->db_wr_lock);
2791
db->db_wr_thread_waiting--;
2792
freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2795
if (now >= then + 16) {
2796
xt_logf(XT_NT_INFO, "Aborting wait for '%s' writer\n", db->db_name);
2800
if (now >= then + 2) {
2803
xt_logf(XT_NT_INFO, "Waiting for '%s' writer...\n", db->db_name);
2809
xt_logf(XT_NT_INFO, "Writer '%s' done.\n", db->db_name);
2813
xtPublic void xt_stop_writer(XTThreadPtr self, XTDatabaseHPtr db)
2817
if (db->db_wr_thread) {
2818
xt_lock_mutex(self, &db->db_wr_lock);
2819
pushr_(xt_unlock_mutex, &db->db_wr_lock);
2821
/* This pointer is safe as long as you have the transaction lock. */
2822
if ((thr_wr = db->db_wr_thread)) {
2823
xtThreadID tid = thr_wr->t_id;
2825
/* Make sure the thread quits when woken up. */
2826
xt_terminate_thread(self, thr_wr);
2828
/* Wake the writer thread so that it will quit: */
2829
xt_broadcast_cond(self, &db->db_wr_cond);
2831
freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2834
* GOTCHA: This is a wierd thing but the SIGTERM directed
2835
* at a particular thread (in this case the sweeper) was
2836
* being caught by a different thread and killing the server
2837
* sometimes. Disconcerting.
2838
* (this may only be a problem on Mac OS X)
2839
xt_kill_thread(thread);
2841
xt_wait_for_thread_to_exit(tid, FALSE);
2843
/* PMC - This should not be necessary to set the signal here, but in the
2844
* debugger the handler is not called!!?
2845
thr_wr->t_delayed_signal = SIGTERM;
2846
xt_kill_thread(thread);
2848
db->db_wr_thread = NULL;
2851
freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2856
static void xlog_add_to_flush_buffer(u_int flush_count, XTXLogBlockPtr *flush_buffer, XTXLogBlockPtr block)
2858
register u_int count = flush_count;
2860
register u_int guess;
2865
guess = (i + count - 1) >> 1;
2866
r = (xtInt8) block->xlb_address - (xtInt8) flush_buffer[guess]->xlb_address;
2868
// Should not happen...
2878
/* Insert at position i */
2879
memmove(flush_buffer + i + 1, flush_buffer + i, (flush_count - i) * sizeof(XTXLogBlockPtr));
2880
flush_buffer[i] = block;
2883
static XTXLogBlockPtr xlog_find_block(XTOpenFilePtr file, xtLogID log_id, off_t address, XTXLogCacheSegPtr *ret_seg)
2885
register XTXLogCacheSegPtr seg;
2886
register XTXLogBlockPtr block;
2887
register u_int hash_idx;
2888
register XTXLogCacheRec *dcg = &xt_xlog_cache;
2890
seg = &dcg->xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK];
2891
hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % dcg->xlc_hash_size;
2893
xt_lock_mutex_ns(&seg->lcs_lock);
2895
block = seg->lcs_hash_table[hash_idx];
2897
if (block->xlb_address == address && block->xlb_log_id == log_id) {
2898
ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
2900
/* Wait if the block is being read or written.
2901
* If we will just read the data, then we don't care
2902
* if the buffer is being written.
2904
if (block->xlb_state == XLC_BLOCK_READING) {
2905
if (!xt_timed_wait_cond_ns(&seg->lcs_cond, &seg->lcs_lock, 100))
2913
block = block->xlb_next;
2916
/* Block not found: */
2917
xt_unlock_mutex_ns(&seg->lcs_lock);
2921
static int xlog_cmp_log_files(struct XTThread *self, register const void *thunk, register const void *a, register const void *b)
2923
#pragma unused(self, thunk)
2924
xtLogID lf_id = *((xtLogID *) a);
2925
XTXactLogFilePtr lf_ptr = (XTXactLogFilePtr) b;
2927
if (lf_id < lf_ptr->lf_log_id)
2929
if (lf_id == lf_ptr->lf_log_id)
2938
static xtBool xlog_free_lru_blocks()
2940
XTXLogBlockPtr block, pblock;
2945
XTXLogCacheSegPtr seg;
2947
xtBool have_global_lock = FALSE;
2949
#ifdef DEBUG_CHECK_CACHE
2950
//xt_xlog_check_cache();
2953
if (!(block = xt_xlog_cache.xlc_lru_block))
2956
ru_time = block->xlb_ru_time;
2957
log_id = block->xlb_log_id;
2958
address = block->xlb_address;
2961
hash = (address >> XT_XLC_BLOCK_SHIFTS) ^ ((off_t) log_id << 28);
2962
seg = &xt_xlog_cache.xlc_segment[hash & XLC_SEGMENT_MASK];
2963
hash_idx = (hash >> XT_XLC_SEGMENT_SHIFTS) % xt_xlog_cache.xlc_hash_size;
2965
seg = &xt_xlog_cache.xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK];
2966
hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % xt_xlog_cache.xlc_hash_size;
2968
xt_lock_mutex_ns(&seg->lcs_lock);
2972
block = seg->lcs_hash_table[hash_idx];
2974
if (block->xlb_address == address && block->xlb_log_id == log_id) {
2975
ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
2977
/* Try again if the block has been used in the meantime: */
2978
if (ru_time != block->xlb_ru_time) {
2979
if (have_global_lock)
2980
/* Having this lock means we have already freed at least one block so
2981
* don't bother to free more if we are having trouble.
2985
/* If the recently used time has changed, then the
2986
* block is probably no longer the LR used.
2988
xt_unlock_mutex_ns(&seg->lcs_lock);
2992
/* Wait if the block is being read: */
2993
if (block->xlb_state == XLC_BLOCK_READING) {
2994
if (have_global_lock)
2997
/* Wait for the block to be read, then try again. */
2998
if (!xt_timed_wait_cond_ns(&seg->lcs_cond, &seg->lcs_lock, 100))
3000
xt_unlock_mutex_ns(&seg->lcs_lock);
3004
goto free_the_block;
3007
block = block->xlb_next;
3010
if (have_global_lock) {
3011
xt_unlock_mutex_ns(&xt_xlog_cache.xlc_lock);
3012
have_global_lock = FALSE;
3015
/* We did not find the block, someone else freed it... */
3016
xt_unlock_mutex_ns(&seg->lcs_lock);
3020
ASSERT_NS(block->xlb_state == XLC_BLOCK_CLEAN);
3022
/* Remove from the hash table: */
3024
pblock->xlb_next = block->xlb_next;
3026
seg->lcs_hash_table[hash_idx] = block->xlb_next;
3028
/* Now free the block */
3029
if (!have_global_lock) {
3030
xt_lock_mutex_ns(&xt_xlog_cache.xlc_lock);
3031
have_global_lock = TRUE;
3034
/* Remove from the MRU list: */
3035
if (xt_xlog_cache.xlc_lru_block == block)
3036
xt_xlog_cache.xlc_lru_block = block->xlb_mr_used;
3037
if (xt_xlog_cache.xlc_mru_block == block)
3038
xt_xlog_cache.xlc_mru_block = block->xlb_lr_used;
3039
if (block->xlb_lr_used)
3040
block->xlb_lr_used->xlb_mr_used = block->xlb_mr_used;
3041
if (block->xlb_mr_used)
3042
block->xlb_mr_used->xlb_lr_used = block->xlb_lr_used;
3044
/* Put the block on the free list: */
3045
block->xlb_next = xt_xlog_cache.xlc_free_list;
3046
xt_xlog_cache.xlc_free_list = block;
3047
xt_xlog_cache.xlc_free_count++;
3048
block->xlb_state = XLC_BLOCK_FREE;
3050
if (xt_xlog_cache.xlc_free_count < XT_XLC_MAX_FREE_COUNT) {
3051
/* Now that we have all the locks, try to free some more in this segment: */
3052
block = block->xlb_mr_used;
3053
for (u_int i=0; block && i<XLC_SEGMENT_COUNT; i++) {
3054
ru_time = block->xlb_ru_time;
3055
log_id = block->xlb_log_id;
3056
address = block->xlb_address;
3058
if (seg == &xt_xlog_cache.xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK]) {
3059
hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % xt_xlog_cache.xlc_hash_size;
3062
block = block->xlb_mr_used;
3067
xt_unlock_mutex_ns(&xt_xlog_cache.xlc_lock);
3068
xt_unlock_mutex_ns(&seg->lcs_lock);
3069
#ifdef DEBUG_CHECK_CACHE
3070
//xt_xlog_check_cache();
3075
xt_unlock_mutex_ns(&seg->lcs_lock);
3076
#ifdef DEBUG_CHECK_CACHE
3077
//xt_xlog_check_cache();