~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/xactlog_xt.cc

lp:drizzle + pbxt 1.1 + test results

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2007 PrimeBase Technologies GmbH
 
2
 *
 
3
 * PrimeBase XT
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * 2007-10-30   Paul McCullagh
 
20
 *
 
21
 * H&G2JCtL
 
22
 *
 
23
 * The transaction log contains all operations on the data handle
 
24
 * and row pointer files of a table.
 
25
 *
 
26
 * The transaction log does not contain operations on index data.
 
27
 */
 
28
 
 
29
#include "xt_config.h"
 
30
 
 
31
#ifdef DRIZZLED
 
32
#include <bitset>
 
33
#endif
 
34
 
 
35
#include <signal.h>
 
36
 
 
37
#include "xactlog_xt.h"
 
38
#include "database_xt.h"
 
39
#include "util_xt.h"
 
40
#include "strutil_xt.h"
 
41
#include "filesys_xt.h"
 
42
#include "myxt_xt.h"
 
43
#include "trace_xt.h"
 
44
 
 
45
#ifdef DEBUG
 
46
//#define PRINT_TABLE_MODIFICATIONS
 
47
//#define TRACE_WRITER_ACTIVITY
 
48
#endif
 
49
#ifndef XT_WIN
 
50
#ifndef XT_MAC
 
51
#define PREWRITE_LOG_COMPLETELY
 
52
#endif
 
53
#endif
 
54
 
 
55
static void xlog_wr_log_written(XTDatabaseHPtr db);
 
56
 
 
57
/*
 
58
 * -----------------------------------------------------------------------
 
59
 * T R A N S A C T I O   L O G   C A C H E
 
60
 */
 
61
 
 
62
static XTXLogCacheRec   xt_xlog_cache;
 
63
 
 
64
/*
 
65
 * Initialize the disk cache.
 
66
 */
 
67
xtPublic void xt_xlog_init(XTThreadPtr self, size_t cache_size)
 
68
{
 
69
        XTXLogBlockPtr  block;
 
70
 
 
71
        /*
 
72
         * This is required to ensure that the block
 
73
         * works!
 
74
         */
 
75
 
 
76
        /* Determine the number of block that will fit into the given memory: */
 
77
        /*
 
78
        xt_xlog_cache.xlc_hash_size = (cache_size / (XLC_SEGMENT_COUNT * sizeof(XTXLogBlockPtr) + sizeof(XTXLogBlockRec))) / (XLC_SEGMENT_COUNT >> 1);
 
79
        xt_xlog_cache.xlc_block_count = (cache_size - (XLC_SEGMENT_COUNT * xt_xlog_cache.xlc_hash_size * sizeof(XTXLogBlockPtr))) / sizeof(XTXLogBlockRec);
 
80
        */
 
81
        /* Do not count the size of the cache directory towards the cache size: */
 
82
        xt_xlog_cache.xlc_block_count = cache_size / sizeof(XTXLogBlockRec);
 
83
        xt_xlog_cache.xlc_upper_limit = ((xtWord8) xt_xlog_cache.xlc_block_count * (xtWord8) XT_XLC_BLOCK_SIZE * (xtWord8) 3) / (xtWord8) 4;
 
84
        xt_xlog_cache.xlc_hash_size = xt_xlog_cache.xlc_block_count / (XLC_SEGMENT_COUNT >> 1);
 
85
        if (!xt_xlog_cache.xlc_hash_size)
 
86
                xt_xlog_cache.xlc_hash_size = 1;
 
87
 
 
88
        try_(a) {
 
89
                for (u_int i=0; i<XLC_SEGMENT_COUNT; i++) {
 
90
                        xt_xlog_cache.xlc_segment[i].lcs_hash_table = (XTXLogBlockPtr *) xt_calloc(self, xt_xlog_cache.xlc_hash_size * sizeof(XTXLogBlockPtr));
 
91
                        xt_init_mutex_with_autoname(self, &xt_xlog_cache.xlc_segment[i].lcs_lock);
 
92
                        xt_init_cond(self, &xt_xlog_cache.xlc_segment[i].lcs_cond);
 
93
                }
 
94
 
 
95
                block = (XTXLogBlockPtr) xt_malloc(self, xt_xlog_cache.xlc_block_count * sizeof(XTXLogBlockRec));
 
96
                xt_xlog_cache.xlc_blocks = block; 
 
97
                xt_xlog_cache.xlc_blocks_end = (XTXLogBlockPtr) ((char *) block + (xt_xlog_cache.xlc_block_count * sizeof(XTXLogBlockRec))); 
 
98
                xt_xlog_cache.xlc_next_to_free = block; 
 
99
                xt_init_mutex_with_autoname(self, &xt_xlog_cache.xlc_lock);
 
100
                xt_init_cond(self, &xt_xlog_cache.xlc_cond);
 
101
 
 
102
                for (u_int i=0; i<xt_xlog_cache.xlc_block_count; i++) {
 
103
                        block->xlb_address = 0;
 
104
                        block->xlb_log_id = 0;
 
105
                        block->xlb_state = XLC_BLOCK_FREE;
 
106
                        block++;
 
107
                }
 
108
                xt_xlog_cache.xlc_free_count = xt_xlog_cache.xlc_block_count;
 
109
        }
 
110
        catch_(a) {
 
111
                XTException e;
 
112
 
 
113
                xt_enter_exception_handler(self, &e);
 
114
                xt_xlog_exit(self);
 
115
                xt_exit_exception_handler(self, &e);
 
116
                xt_throw(self);
 
117
        }
 
118
        cont_(a);
 
119
}
 
120
 
 
121
xtPublic void xt_xlog_exit(XTThreadPtr)
 
122
{
 
123
        for (u_int i=0; i<XLC_SEGMENT_COUNT; i++) {
 
124
                if (xt_xlog_cache.xlc_segment[i].lcs_hash_table) {
 
125
                        xt_free_ns(xt_xlog_cache.xlc_segment[i].lcs_hash_table);
 
126
                        xt_xlog_cache.xlc_segment[i].lcs_hash_table = NULL;
 
127
                        xt_free_mutex(&xt_xlog_cache.xlc_segment[i].lcs_lock);
 
128
                        xt_free_cond(&xt_xlog_cache.xlc_segment[i].lcs_cond);
 
129
                }
 
130
        }
 
131
 
 
132
        if (xt_xlog_cache.xlc_blocks) {
 
133
                xt_free_ns(xt_xlog_cache.xlc_blocks);
 
134
                xt_xlog_cache.xlc_blocks = NULL;
 
135
                xt_free_mutex(&xt_xlog_cache.xlc_lock);
 
136
                xt_free_cond(&xt_xlog_cache.xlc_cond);
 
137
        }
 
138
        memset(&xt_xlog_cache, 0, sizeof(xt_xlog_cache));
 
139
}
 
140
 
 
141
xtPublic xtInt8 xt_xlog_get_usage()
 
142
{
 
143
        xtInt8 size;
 
144
 
 
145
        size = (xtInt8) (xt_xlog_cache.xlc_block_count - xt_xlog_cache.xlc_free_count) * sizeof(XTXLogBlockRec);
 
146
        return size;
 
147
}
 
148
 
 
149
xtPublic xtInt8 xt_xlog_get_size()
 
150
{
 
151
        xtInt8 size;
 
152
 
 
153
        size = (xtInt8) xt_xlog_cache.xlc_block_count * sizeof(XTXLogBlockRec);
 
154
        return size;
 
155
}
 
156
 
 
157
xtPublic xtLogID xt_xlog_get_min_log(XTThreadPtr self, XTDatabaseHPtr db)
 
158
{
 
159
        char                    path[PATH_MAX];
 
160
        XTOpenDirPtr    od;
 
161
        char                    *file;
 
162
        xtLogID                 log_id, min_log = 0;
 
163
 
 
164
        xt_strcpy(PATH_MAX, path, db->db_main_path);
 
165
        xt_add_system_dir(PATH_MAX, path);
 
166
        if (xt_fs_exists(path)) {
 
167
                pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
 
168
                while (xt_dir_next(self, od)) {
 
169
                        file = xt_dir_name(self, od);
 
170
                        if (xt_starts_with(file, "xlog")) {
 
171
                                if ((log_id = (xtLogID) xt_file_name_to_id(file))) {
 
172
                                        if (!min_log || log_id < min_log)
 
173
                                                min_log = log_id;
 
174
                                }
 
175
                        }
 
176
                }
 
177
                freer_(); // xt_dir_close(od)
 
178
        }
 
179
        if (!min_log)
 
180
                return 1;
 
181
        return min_log;
 
182
}
 
183
 
 
184
xtPublic void xt_xlog_delete_logs(XTThreadPtr self, XTDatabaseHPtr db)
 
185
{
 
186
        char                    path[PATH_MAX];
 
187
        XTOpenDirPtr    od;
 
188
        char                    *file;
 
189
 
 
190
        /* Close all the index logs before we delete them: */
 
191
        db->db_indlogs.ilp_close(self, TRUE);
 
192
 
 
193
        /* Close the transaction logs too: */
 
194
        db->db_xlog.xlog_close(self);
 
195
 
 
196
        xt_strcpy(PATH_MAX, path, db->db_main_path);
 
197
        xt_add_system_dir(PATH_MAX, path);
 
198
        if (!xt_fs_exists(path))
 
199
                return;
 
200
        pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
 
201
        while (xt_dir_next(self, od)) {
 
202
                file = xt_dir_name(self, od);
 
203
                if (xt_ends_with(file, ".xt")) {
 
204
                        xt_add_dir_char(PATH_MAX, path);
 
205
                        xt_strcat(PATH_MAX, path, file);
 
206
                        xt_fs_delete(self, path);
 
207
                        xt_remove_last_name_of_path(path);
 
208
                }
 
209
        }
 
210
        freer_(); // xt_dir_close(od)
 
211
 
 
212
        /* I no longer attach the condition: !db->db_multi_path
 
213
         * to removing this directory. This is because
 
214
         * the pbxt directory must now be removed explicitly
 
215
         * by drop database, or by delete all the PBXT
 
216
         * system tables.
 
217
         */
 
218
        if (!xt_fs_rmdir(NULL, path))
 
219
                xt_log_and_clear_exception(self);
 
220
}
 
221
 
 
222
#ifdef DEBUG_CHECK_CACHE
 
223
static void xt_xlog_check_cache(void)
 
224
{
 
225
        XTXLogBlockPtr  block, pblock;
 
226
        u_int                   used_count;
 
227
        u_int                   free_count;
 
228
 
 
229
        // Check the LRU list:
 
230
        used_count = 0;
 
231
        pblock = NULL;
 
232
        block = xt_xlog_cache.xlc_lru_block;
 
233
        while (block) {
 
234
                used_count++;
 
235
                ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
 
236
                ASSERT_NS(block->xlb_lr_used == pblock);
 
237
                pblock = block;
 
238
                block = block->xlb_mr_used;
 
239
        }
 
240
        ASSERT_NS(xt_xlog_cache.xlc_mru_block == pblock);
 
241
        ASSERT_NS(xt_xlog_cache.xlc_free_count + used_count == xt_xlog_cache.xlc_block_count);
 
242
 
 
243
        // Check the free list:
 
244
        free_count = 0;
 
245
        block = xt_xlog_cache.xlc_free_list;
 
246
        while (block) {
 
247
                free_count++;
 
248
                ASSERT_NS(block->xlb_state == XLC_BLOCK_FREE);
 
249
                block = block->xlb_next;
 
250
        }
 
251
        ASSERT_NS(xt_xlog_cache.xlc_free_count == free_count);
 
252
}
 
253
#endif
 
254
 
 
255
#ifdef FOR_DEBUG
 
256
static void xlog_check_lru_list(XTXLogBlockPtr block)
 
257
{
 
258
        XTXLogBlockPtr list_block, plist_block;
 
259
        
 
260
        plist_block = NULL;
 
261
        list_block = xt_xlog_cache.xlc_lru_block;
 
262
        while (list_block) {
 
263
                ASSERT_NS(block != list_block);
 
264
                ASSERT_NS(list_block->xlb_lr_used == plist_block);
 
265
                plist_block = list_block;
 
266
                list_block = list_block->xlb_mr_used;
 
267
        }
 
268
        ASSERT_NS(xt_xlog_cache.xlc_mru_block == plist_block);
 
269
}
 
270
#endif
 
271
 
 
272
/*
 
273
 * Log cache blocks are used and freed on a round-robin basis.
 
274
 * In addition, only data read by restart, and data transfered
 
275
 * from the transaction log are stored in the transaction log.
 
276
 *
 
277
 * This ensures that the transaction log contains the most
 
278
 * recently written log data.
 
279
 *
 
280
 * If the sweeper gets behind due to a long running transacation
 
281
 * then it falls out of the log cache, and must read from
 
282
 * the log files directly.
 
283
 *
 
284
 * This data read is no longer cached as it was previously.
 
285
 * This has the advantage that it does not disturn the writter
 
286
 * thread which would otherwise hit the cache.
 
287
 *
 
288
 * If transactions are not too long, it should be possible
 
289
 * to keep the sweeper in the log cache.
 
290
 */
 
291
static xtBool xlog_free_block(XTXLogBlockPtr to_free)
 
292
{
 
293
        XTXLogBlockPtr          block, pblock;
 
294
        xtLogID                         log_id;
 
295
        off_t                           address;
 
296
        XTXLogCacheSegPtr       seg;
 
297
        u_int                           hash_idx;
 
298
 
 
299
        retry:
 
300
        log_id = to_free->xlb_log_id;
 
301
        address = to_free->xlb_address;
 
302
 
 
303
        seg = &xt_xlog_cache.xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK];
 
304
        hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % xt_xlog_cache.xlc_hash_size;
 
305
 
 
306
        xt_lock_mutex_ns(&seg->lcs_lock);
 
307
        if (to_free->xlb_state == XLC_BLOCK_FREE)
 
308
                goto done_ok;
 
309
        if (to_free->xlb_log_id != log_id || to_free->xlb_address != address) {
 
310
                xt_unlock_mutex_ns(&seg->lcs_lock);
 
311
                goto retry;
 
312
        }
 
313
 
 
314
        pblock = NULL;
 
315
        block = seg->lcs_hash_table[hash_idx];
 
316
        while (block) {
 
317
                if (block->xlb_address == address && block->xlb_log_id == log_id) {
 
318
                        ASSERT_NS(block == to_free);
 
319
                        ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
 
320
                        
 
321
                        /* Wait if the block is being read: */
 
322
                        if (block->xlb_state == XLC_BLOCK_READING) {
 
323
                                /* Wait for the block to be read, then try again. */
 
324
                                if (!xt_timed_wait_cond_ns(&seg->lcs_cond, &seg->lcs_lock, 100))
 
325
                                        goto failed;
 
326
                                xt_unlock_mutex_ns(&seg->lcs_lock);
 
327
                                goto retry;
 
328
                        }
 
329
                        
 
330
                        goto free_the_block;
 
331
                }
 
332
                pblock = block;
 
333
                block = block->xlb_next;
 
334
        }
 
335
 
 
336
        /* We did not find the block, someone else freed it... */
 
337
        xt_unlock_mutex_ns(&seg->lcs_lock);
 
338
        goto retry;
 
339
 
 
340
        free_the_block:
 
341
        ASSERT_NS(block->xlb_state == XLC_BLOCK_CLEAN);
 
342
 
 
343
        /* Remove from the hash table: */
 
344
        if (pblock)
 
345
                pblock->xlb_next = block->xlb_next;
 
346
        else
 
347
                seg->lcs_hash_table[hash_idx] = block->xlb_next;
 
348
 
 
349
        /* Free the block: */
 
350
        xt_xlog_cache.xlc_free_count++;
 
351
        block->xlb_state = XLC_BLOCK_FREE;
 
352
 
 
353
        done_ok:
 
354
        xt_unlock_mutex_ns(&seg->lcs_lock);
 
355
        return OK;
 
356
        
 
357
        failed:
 
358
        xt_unlock_mutex_ns(&seg->lcs_lock);
 
359
        return FAILED;
 
360
}
 
361
 
 
362
#define XT_FETCH_READ           0
 
363
#define XT_FETCH_BLANK          1
 
364
#define XT_FETCH_TEST           2
 
365
 
 
366
static xtBool xlog_fetch_block(XTXLogBlockPtr *ret_block, XTOpenFilePtr file, xtLogID log_id, off_t address, XTXLogCacheSegPtr *ret_seg, int fetch_type, XTThreadPtr thread)
 
367
{
 
368
        register XTXLogBlockPtr         block;
 
369
        register XTXLogCacheSegPtr      seg;
 
370
        register u_int                          hash_idx;
 
371
        register XTXLogCacheRec         *dcg = &xt_xlog_cache;
 
372
        size_t                                          red_size;
 
373
 
 
374
        /* Make sure we have a free block ready (to avoid unlock below): */
 
375
        if (fetch_type != XT_FETCH_TEST && dcg->xlc_next_to_free->xlb_state != XLC_BLOCK_FREE) {
 
376
                if (!xlog_free_block(dcg->xlc_next_to_free))
 
377
                        return FAILED;
 
378
        }
 
379
 
 
380
        seg = &dcg->xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK];
 
381
        hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % dcg->xlc_hash_size;
 
382
 
 
383
        xt_lock_mutex_ns(&seg->lcs_lock);
 
384
        retry:
 
385
        block = seg->lcs_hash_table[hash_idx];
 
386
        while (block) {
 
387
                if (block->xlb_address == address && block->xlb_log_id == log_id) {
 
388
                        ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
 
389
 
 
390
                        /*
 
391
                         * Wait if the block is being read.
 
392
                         */
 
393
                        if (block->xlb_state == XLC_BLOCK_READING) {
 
394
                                if (!xt_timed_wait_cond_ns(&seg->lcs_cond, &seg->lcs_lock, 100)) {
 
395
                                        xt_unlock_mutex_ns(&seg->lcs_lock);
 
396
                                        return FAILED;
 
397
                                }
 
398
                                goto retry;
 
399
                        }
 
400
 
 
401
                        *ret_seg = seg;
 
402
                        *ret_block = block;
 
403
                        thread->st_statistics.st_xlog_cache_hit++;
 
404
                        return OK;
 
405
                }
 
406
                block = block->xlb_next;
 
407
        }
 
408
 
 
409
        if (fetch_type == XT_FETCH_TEST) {
 
410
                xt_unlock_mutex_ns(&seg->lcs_lock);
 
411
                *ret_seg = NULL;
 
412
                *ret_block = NULL;
 
413
                thread->st_statistics.st_xlog_cache_miss++;
 
414
                return OK;
 
415
        }
 
416
 
 
417
        /* Block not found: */
 
418
        get_free_block:
 
419
        if (dcg->xlc_next_to_free->xlb_state != XLC_BLOCK_FREE) {
 
420
                xt_unlock_mutex_ns(&seg->lcs_lock);
 
421
                if (!xlog_free_block(dcg->xlc_next_to_free))
 
422
                        return FAILED;
 
423
                xt_lock_mutex_ns(&seg->lcs_lock);
 
424
        }
 
425
 
 
426
        xt_lock_mutex_ns(&dcg->xlc_lock);
 
427
        block = dcg->xlc_next_to_free;
 
428
        if (block->xlb_state != XLC_BLOCK_FREE) {
 
429
                xt_unlock_mutex_ns(&dcg->xlc_lock);
 
430
                goto get_free_block;
 
431
        }
 
432
        dcg->xlc_next_to_free++;
 
433
        if (dcg->xlc_next_to_free == dcg->xlc_blocks_end)
 
434
                dcg->xlc_next_to_free = dcg->xlc_blocks;
 
435
        dcg->xlc_free_count--;
 
436
 
 
437
        if (fetch_type == XT_FETCH_READ) {
 
438
                block->xlb_address = address;
 
439
                block->xlb_log_id = log_id;
 
440
                block->xlb_state = XLC_BLOCK_READING;
 
441
 
 
442
                xt_unlock_mutex_ns(&dcg->xlc_lock);
 
443
 
 
444
                /* Add the block to the hash table: */
 
445
                block->xlb_next = seg->lcs_hash_table[hash_idx];
 
446
                seg->lcs_hash_table[hash_idx] = block;
 
447
 
 
448
                /* Read the block into memory: */
 
449
                xt_unlock_mutex_ns(&seg->lcs_lock);
 
450
 
 
451
                if (!xt_pread_file(file, address, XT_XLC_BLOCK_SIZE, 0, block->xlb_data, &red_size, &thread->st_statistics.st_xlog, thread))
 
452
                        return FAILED;
 
453
                memset(block->xlb_data + red_size, 0, XT_XLC_BLOCK_SIZE - red_size);
 
454
                thread->st_statistics.st_xlog_cache_miss++;
 
455
 
 
456
                xt_lock_mutex_ns(&seg->lcs_lock);
 
457
                block->xlb_state = XLC_BLOCK_CLEAN;
 
458
                xt_cond_wakeall(&seg->lcs_cond);
 
459
        }
 
460
        else {
 
461
                block->xlb_address = address;
 
462
                block->xlb_log_id = log_id;
 
463
                block->xlb_state = XLC_BLOCK_CLEAN;
 
464
                memset(block->xlb_data, 0, XT_XLC_BLOCK_SIZE);
 
465
 
 
466
                xt_unlock_mutex_ns(&dcg->xlc_lock);
 
467
 
 
468
                /* Add the block to the hash table: */
 
469
                block->xlb_next = seg->lcs_hash_table[hash_idx];
 
470
                seg->lcs_hash_table[hash_idx] = block;
 
471
        }
 
472
 
 
473
        *ret_seg = seg;
 
474
        *ret_block = block;
 
475
#ifdef DEBUG_CHECK_CACHE
 
476
        //xt_xlog_check_cache();
 
477
#endif
 
478
        return OK;
 
479
}
 
480
 
 
481
static xtBool xlog_transfer_to_cache(XTOpenFilePtr file, xtLogID log_id, off_t offset, size_t size, xtWord1 *data, XTThreadPtr thread)
 
482
{
 
483
        off_t                           address;
 
484
        XTXLogBlockPtr          block;
 
485
        XTXLogCacheSegPtr       seg;
 
486
        size_t                          boff;
 
487
        size_t                          tfer;
 
488
        xtBool                          read_block = FALSE;
 
489
 
 
490
#ifdef DEBUG_CHECK_CACHE
 
491
        //xt_xlog_check_cache();
 
492
#endif
 
493
        /* We have to read the first block, if we are
 
494
         * not at the begining of the file:
 
495
         */
 
496
        if (offset)
 
497
                read_block = TRUE;
 
498
        address = offset & ~XT_XLC_BLOCK_MASK;
 
499
 
 
500
        boff = (size_t) (offset - address);
 
501
        tfer = XT_XLC_BLOCK_SIZE - boff;
 
502
        if (tfer > size)
 
503
                tfer = size;
 
504
        while (size > 0) {
 
505
                if (!xlog_fetch_block(&block, file, log_id, address, &seg, read_block ? XT_FETCH_READ : XT_FETCH_BLANK, thread)) {
 
506
#ifdef DEBUG_CHECK_CACHE
 
507
                        //xt_xlog_check_cache();
 
508
#endif
 
509
                        return FAILED;
 
510
                }
 
511
                ASSERT_NS(block && block->xlb_state == XLC_BLOCK_CLEAN);
 
512
                memcpy(block->xlb_data + boff, data, tfer);
 
513
                xt_unlock_mutex_ns(&seg->lcs_lock);
 
514
                size -= tfer;
 
515
                data += tfer;
 
516
 
 
517
                /* Following block need not be read
 
518
                 * because we always transfer to the
 
519
                 * end of the file!
 
520
                 */
 
521
                read_block = FALSE;
 
522
                address += XT_XLC_BLOCK_SIZE;
 
523
 
 
524
                boff = 0;
 
525
                tfer = size;
 
526
                if (tfer > XT_XLC_BLOCK_SIZE)
 
527
                        tfer = XT_XLC_BLOCK_SIZE;
 
528
        }
 
529
#ifdef DEBUG_CHECK_CACHE
 
530
        //xt_xlog_check_cache();
 
531
#endif
 
532
        return OK;
 
533
}
 
534
 
 
535
static xtBool xt_xlog_read(XTOpenFilePtr file, xtLogID log_id, off_t offset, size_t size, xtWord1 *data, xtBool load_cache, XTThreadPtr thread)
 
536
{
 
537
        off_t                           address;
 
538
        XTXLogBlockPtr          block;
 
539
        XTXLogCacheSegPtr       seg;
 
540
        size_t                          boff;
 
541
        size_t                          tfer;
 
542
 
 
543
#ifdef DEBUG_CHECK_CACHE
 
544
        //xt_xlog_check_cache();
 
545
#endif
 
546
        address = offset & ~XT_XLC_BLOCK_MASK;
 
547
        boff = (size_t) (offset - address);
 
548
        tfer = XT_XLC_BLOCK_SIZE - boff;
 
549
        if (tfer > size)
 
550
                tfer = size;
 
551
        while (size > 0) {
 
552
                if (!xlog_fetch_block(&block, file, log_id, address, &seg, load_cache ? XT_FETCH_READ : XT_FETCH_TEST, thread))
 
553
                        return FAILED;
 
554
                if (!block) {
 
555
                        size_t red_size;
 
556
 
 
557
                        if (!xt_pread_file(file, address + boff, size, 0, data, &red_size, &thread->st_statistics.st_xlog, thread))
 
558
                                return FAILED;
 
559
                        memset(data + red_size, 0, size - red_size);
 
560
                        return OK;
 
561
                }
 
562
                memcpy(data, block->xlb_data + boff, tfer);
 
563
                xt_unlock_mutex_ns(&seg->lcs_lock);
 
564
                size -= tfer;
 
565
                data += tfer;
 
566
                address += XT_XLC_BLOCK_SIZE;
 
567
                boff = 0;
 
568
                tfer = size;
 
569
                if (tfer > XT_XLC_BLOCK_SIZE)
 
570
                        tfer = XT_XLC_BLOCK_SIZE;
 
571
        }
 
572
#ifdef DEBUG_CHECK_CACHE
 
573
        //xt_xlog_check_cache();
 
574
#endif
 
575
        return OK;
 
576
}
 
577
 
 
578
static xtBool xt_xlog_write(XTOpenFilePtr file, xtLogID log_id, off_t offset, size_t size, xtWord1 *data, XTThreadPtr thread)
 
579
{
 
580
        if (!xt_pwrite_file(file, offset, size, data, &thread->st_statistics.st_xlog, thread))
 
581
                return FAILED;
 
582
        return xlog_transfer_to_cache(file, log_id, offset, size, data, thread);
 
583
}
 
584
 
 
585
/*
 
586
 * -----------------------------------------------------------------------
 
587
 * D A T A B A S E   T R A N S A C T I O N   L O G S
 
588
 */
 
589
 
 
590
void XTDatabaseLog::xlog_setup(XTThreadPtr self, XTDatabaseHPtr db, off_t inp_log_file_size, size_t transaction_buffer_size, int log_count)
 
591
{
 
592
        volatile off_t  log_file_size = inp_log_file_size;
 
593
        size_t                  log_size;
 
594
 
 
595
        try_(a) {
 
596
                memset(this, 0, sizeof(XTDatabaseLogRec));
 
597
 
 
598
                if (log_count <= 1)
 
599
                        log_count = 1;
 
600
                else if (log_count > 1000000)
 
601
                        log_count = 1000000;
 
602
 
 
603
                xl_db = db;
 
604
 
 
605
                xl_log_file_threshold = xt_align_offset(log_file_size, 1024);
 
606
                xl_log_file_count = log_count;
 
607
                xl_size_of_buffers = transaction_buffer_size;
 
608
        
 
609
                xt_init_mutex_with_autoname(self, &xl_write_lock);
 
610
                xt_init_cond(self, &xl_write_cond);
 
611
#ifdef XT_XLOG_WAIT_SPINS
 
612
                xt_writing = 0;
 
613
                xt_waiting = 0;
 
614
#else
 
615
                xt_writing = FALSE;
 
616
#endif
 
617
                xl_log_id = 0;
 
618
                xl_log_file = 0;
 
619
        
 
620
                xt_spinlock_init_with_autoname(self, &xl_buffer_lock);
 
621
 
 
622
                /* Note that we allocate a little bit more for each buffer
 
623
                 * in order to make sure that we can write a trailing record
 
624
                 * to the log buffer.
 
625
                 */
 
626
                log_size = transaction_buffer_size + sizeof(XTXactNewLogEntryDRec);
 
627
                
 
628
                /* Add in order to round the buffer to an integral of 512 */
 
629
                if (log_size % 512)
 
630
                        log_size += (512 - (log_size % 512));
 
631
 
 
632
                xl_write_log_id = 0;
 
633
                xl_write_log_offset = 0;
 
634
                xl_write_buf_pos = 0;
 
635
                xl_write_buf_pos_start = 0;
 
636
                xl_write_buffer = (xtWord1 *) xt_malloc(self, log_size);
 
637
                xl_write_done = TRUE;
 
638
 
 
639
                xl_append_log_id = 0;
 
640
                xl_append_log_offset = 0;
 
641
                xl_append_buf_pos = 0;
 
642
                xl_append_buf_pos_start = 0;
 
643
                xl_append_buffer = (xtWord1 *) xt_malloc(self, log_size);
 
644
 
 
645
                xl_last_flush_time = 10;
 
646
                xl_flush_log_id = 0;
 
647
                xl_flush_log_offset = 0;
 
648
        }
 
649
        catch_(a) {
 
650
                XTException e;
 
651
 
 
652
                xt_enter_exception_handler(self, &e);
 
653
                xlog_exit(self);
 
654
                xt_exit_exception_handler(self, &e);
 
655
                xt_throw(self);
 
656
        }
 
657
        cont_(a);
 
658
}
 
659
 
 
660
xtBool XTDatabaseLog::xlog_set_write_offset(xtLogID log_id, xtLogOffset log_offset, xtLogID max_log_id, XTThreadPtr thread)
 
661
{
 
662
        xl_max_log_id = max_log_id;
 
663
 
 
664
        xl_write_log_id = log_id;
 
665
        xl_write_log_offset = log_offset;
 
666
        xl_write_buf_pos = 0;
 
667
        xl_write_buf_pos_start = 0;
 
668
        xl_write_done = TRUE;
 
669
 
 
670
        xl_append_log_id = log_id;
 
671
        xl_append_log_offset = log_offset;
 
672
        if (log_offset == 0) {
 
673
                XTXactLogHeaderDPtr log_head;
 
674
 
 
675
                log_head = (XTXactLogHeaderDPtr) xl_append_buffer;
 
676
                memset(log_head, 0, sizeof(XTXactLogHeaderDRec));
 
677
                log_head->xh_status_1 = XT_LOG_ENT_HEADER;
 
678
                log_head->xh_checksum_1 = XT_CHECKSUM_1(log_id);
 
679
                XT_SET_DISK_4(log_head->xh_size_4, sizeof(XTXactLogHeaderDRec));
 
680
                XT_SET_DISK_4(log_head->xh_log_id_4, log_id);
 
681
                XT_SET_DISK_2(log_head->xh_version_2, XT_LOG_VERSION_NO);
 
682
                XT_SET_DISK_4(log_head->xh_magic_4, XT_LOG_FILE_MAGIC);
 
683
                xl_append_buf_pos = sizeof(XTXactLogHeaderDRec);
 
684
                xl_append_buf_pos_start = 0;
 
685
        }
 
686
        else {
 
687
                /* Start the log buffer at a block boundary: */
 
688
                size_t buf_pos = (size_t) (log_offset % 512);
 
689
 
 
690
                xl_append_buf_pos = buf_pos;
 
691
                xl_append_buf_pos_start = buf_pos;
 
692
                xl_append_log_offset = log_offset - buf_pos;
 
693
 
 
694
                if (!xlog_open_log(log_id, log_offset, thread))
 
695
                        return FAILED;
 
696
 
 
697
                if (!xt_pread_file(xl_log_file, xl_append_log_offset, buf_pos, buf_pos, xl_append_buffer, NULL, &thread->st_statistics.st_xlog, thread))
 
698
                        return FAILED;
 
699
        }
 
700
 
 
701
        xl_flush_log_id = log_id;
 
702
        xl_flush_log_offset = log_offset;
 
703
        return OK;
 
704
}
 
705
 
 
706
void XTDatabaseLog::xlog_close(XTThreadPtr self)
 
707
{
 
708
        if (xl_log_file) {
 
709
                xt_close_file(self, xl_log_file);
 
710
                xl_log_file = NULL;
 
711
        }
 
712
}
 
713
 
 
714
void XTDatabaseLog::xlog_exit(XTThreadPtr self)
 
715
{
 
716
        xt_spinlock_free(self, &xl_buffer_lock);
 
717
        xt_free_mutex(&xl_write_lock);
 
718
        xt_free_cond(&xl_write_cond);
 
719
        xlog_close(self);
 
720
        if (xl_write_buffer) {
 
721
                xt_free_ns(xl_write_buffer);
 
722
                xl_write_buffer = NULL;
 
723
        }
 
724
        if (xl_append_buffer) {
 
725
                xt_free_ns(xl_append_buffer);
 
726
                xl_append_buffer = NULL;
 
727
        }
 
728
}
 
729
 
 
730
#define WR_NO_SPACE             1                       /* Write because there is no space, or some other reason */
 
731
#define WR_FLUSH                2                       /* Normal commit, write and flush */
 
732
 
 
733
xtBool XTDatabaseLog::xlog_flush(XTThreadPtr thread)
 
734
{
 
735
        if (!xlog_flush_pending())
 
736
                return OK;
 
737
        return xlog_append(thread, 0, NULL, 0, NULL, XT_XLOG_WRITE_AND_FLUSH, NULL, NULL);
 
738
}
 
739
 
 
740
xtBool XTDatabaseLog::xlog_flush_pending()
 
741
{
 
742
        xtLogID         req_flush_log_id;
 
743
        xtLogOffset     req_flush_log_offset;
 
744
 
 
745
        xt_lck_slock(&xl_buffer_lock);
 
746
        req_flush_log_id = xl_append_log_id;
 
747
        req_flush_log_offset = xl_append_log_offset + xl_append_buf_pos;
 
748
        if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
 
749
                xt_spinlock_unlock(&xl_buffer_lock);
 
750
                return FALSE;
 
751
        }
 
752
        xt_spinlock_unlock(&xl_buffer_lock);
 
753
        return TRUE;
 
754
}
 
755
 
 
756
/*
 
757
 * Write data to the end of the log buffer.
 
758
 *
 
759
 * commit is set to true if the caller also requires
 
760
 * the log to be flushed, after writing the data.
 
761
 *
 
762
 * This function returns the log ID and offset of
 
763
 * the data write position.
 
764
 */
 
765
xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *data1, size_t size2, xtWord1 *data2, int flush_log_at_trx_commit, xtLogID *log_id, xtLogOffset *log_offset)
 
766
{
 
767
        int                     write_reason = 0;
 
768
        xtLogID         req_flush_log_id;
 
769
        xtLogOffset     req_flush_log_offset;
 
770
        size_t          part_size;
 
771
        xtWord8         flush_time;
 
772
        xtWord2         sum;
 
773
 
 
774
        /* The first size value must be set, of the second is set! */
 
775
        ASSERT_NS(size1 || !size2);
 
776
 
 
777
        if (!size1) {
 
778
                /* Just flush the buffer... */
 
779
                xt_lck_slock(&xl_buffer_lock);
 
780
                write_reason = flush_log_at_trx_commit == XT_XLOG_WRITE_AND_FLUSH ? WR_FLUSH : WR_NO_SPACE;
 
781
                req_flush_log_id = xl_append_log_id;
 
782
                req_flush_log_offset = xl_append_log_offset + xl_append_buf_pos;
 
783
                xt_spinlock_unlock(&xl_buffer_lock);
 
784
                goto write_log_to_file;
 
785
        }
 
786
        req_flush_log_id = 0;
 
787
        req_flush_log_offset = 0;
 
788
 
 
789
        /*
 
790
         * This is a dirty read, which will send us to the
 
791
         * best starting position:
 
792
         *
 
793
         * If there is space, now, then there is probably
 
794
         * still enough space, after we have locked the
 
795
         * buffer for writting.
 
796
         */
 
797
        if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers)
 
798
                goto copy_to_log_buffer;
 
799
 
 
800
        /*
 
801
         * There is not enough space in the append buffer.
 
802
         * So we need to write the log, until there is space.
 
803
         */
 
804
        write_reason = WR_NO_SPACE;
 
805
 
 
806
        write_log_to_file:
 
807
        if (write_reason) {
 
808
                /* We need to write for one of 2 reasons: not
 
809
                 * enough space in the buffer, or a flush
 
810
                 * is required.
 
811
                 */
 
812
                xtWord8 then;
 
813
                 
 
814
                /*
 
815
                 * The objective of the following code is to
 
816
                 * pick one writer, out of all threads.
 
817
                 * The rest will wait for the writer.
 
818
                 */
 
819
 
 
820
                if (write_reason == WR_FLUSH) {
 
821
                        /* Before we flush, check if we should wait for running
 
822
                         * transactions that may commit shortly.
 
823
                         */
 
824
                        if (xl_db->db_xn_writer_count - xl_db->db_xn_writer_wait_count - xl_db->db_xn_long_running_count > 0 && xl_last_flush_time) {
 
825
                                /* Wait for about as long as the last flush took,
 
826
                                 * the idea is to saturate the disk with flushing...: */
 
827
                                then = xt_trace_clock() + (xtWord8) xl_last_flush_time;
 
828
                                for (;;) {
 
829
                                        xt_critical_wait();
 
830
                                        /* If a thread leaves this loop because times up, or
 
831
                                         * a thread manages to flush so fast that this thread
 
832
                                         * sleeps during this time, then it could be that
 
833
                                         * the required flush occurs before other conditions
 
834
                                         * of this loop are met!
 
835
                                         *
 
836
                                         * So we check here to make sure that the log has not been
 
837
                                         * flushed as we require:
 
838
                                         */
 
839
                                        if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
 
840
                                                ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
 
841
                                                return OK;
 
842
                                        }
 
843
 
 
844
                                        if (xl_db->db_xn_writer_count - xl_db->db_xn_writer_wait_count - xl_db->db_xn_long_running_count > 0)
 
845
                                                break;
 
846
                                        if (xt_trace_clock() >= then)
 
847
                                                break;
 
848
                                }
 
849
                        }
 
850
                }
 
851
 
 
852
#ifdef XT_XLOG_WAIT_SPINS
 
853
                /* Spin for 1/1000s: */
 
854
                then = xt_trace_clock() + (xtWord8) 1000;
 
855
                for (;;) {
 
856
                        if (!xt_atomic_tas4(&xt_writing, 1))
 
857
                                break;
 
858
 
 
859
                        /* If I am not the writer, then I just waited for the
 
860
                         * writer. So it may be that my requirements have now
 
861
                         * been met!
 
862
                         */
 
863
                        if (write_reason == WR_FLUSH) {
 
864
                                /* If the reason was to flush, then
 
865
                                 * check the last flush sequence, maybe it is passed
 
866
                                 * our required sequence.
 
867
                                 */
 
868
                                if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
 
869
                                        /* The required flush position of the log is before
 
870
                                         * or equal to the actual flush position. This means the condition
 
871
                                         * for this thread have been satified (via group commit).
 
872
                                         * Nothing more to do!
 
873
                                         */
 
874
                                        ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
 
875
                                        return OK;
 
876
                                }
 
877
                        }
 
878
                        else if (size1) {
 
879
                                /* It may be that there is now space in the append buffer: */
 
880
                                if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers)
 
881
                                        goto copy_to_log_buffer;
 
882
                        }
 
883
                        else {
 
884
                                /* We are just writing the buffer! */
 
885
                                ASSERT_NS(write_reason == WR_NO_SPACE);
 
886
                                if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0)
 
887
                                        return OK;
 
888
                        }
 
889
 
 
890
                        if (xt_trace_clock() >= then) {
 
891
                                xt_lock_mutex_ns(&xl_write_lock);
 
892
                                xt_waiting++;
 
893
                                if (!xt_timed_wait_cond_ns(&xl_write_cond, &xl_write_lock, 500)) {
 
894
                                        xt_waiting--;
 
895
                                        xt_unlock_mutex_ns(&xl_write_lock);
 
896
                                        return FALSE;
 
897
                                }
 
898
                                xt_waiting--;
 
899
                                xt_unlock_mutex_ns(&xl_write_lock);
 
900
                        }
 
901
                        else
 
902
                                xt_critical_wait();
 
903
                }
 
904
#else
 
905
                xtBool i_am_writer;
 
906
 
 
907
                i_am_writer = FALSE;
 
908
                xt_lock_mutex_ns(&xl_write_lock);
 
909
                if (xt_writing) {
 
910
                        if (!xt_timed_wait_cond_ns(&xl_write_cond, &xl_write_lock, 500)) {
 
911
                                xt_unlock_mutex_ns(&xl_write_lock);
 
912
                                return FALSE;
 
913
                        }
 
914
                }
 
915
                else {
 
916
                        xt_writing = TRUE;
 
917
                        i_am_writer = TRUE;
 
918
                }
 
919
                xt_unlock_mutex_ns(&xl_write_lock);
 
920
 
 
921
                if (!i_am_writer) {
 
922
                        /* If I am not the writer, then I just waited for the
 
923
                         * writer. So it may be that my requirements have now
 
924
                         * been met!
 
925
                         */
 
926
                        if (write_reason == WR_FLUSH) {
 
927
                                /* If the reason was to flush, then
 
928
                                 * check the last flush sequence, maybe it is passed
 
929
                                 * our required sequence.
 
930
                                 */
 
931
                                if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
 
932
                                        /* The required flush position of the log is before
 
933
                                         * or equal to the actual flush position. This means the condition
 
934
                                         * for this thread have been satified (via group commit).
 
935
                                         * Nothing more to do!
 
936
                                         */
 
937
                                        ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
 
938
                                        return OK;
 
939
                                }
 
940
                        }
 
941
                        else if (size1) {
 
942
                                /* It may be that there is now space in the append buffer: */
 
943
                                if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers)
 
944
                                        goto copy_to_log_buffer;
 
945
                        }
 
946
                        else {
 
947
                                /* We are just writing the buffer! */
 
948
                                ASSERT_NS(write_reason == WR_NO_SPACE);
 
949
                                if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0)
 
950
                                        return OK;
 
951
                        }
 
952
                                
 
953
                        goto write_log_to_file;
 
954
                }
 
955
#endif
 
956
 
 
957
                /* I am the writer, check the conditions, again: */
 
958
                if (write_reason == WR_FLUSH) {
 
959
                        /* The writer wants the log to be flushed to a particular point: */
 
960
                        if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
 
961
                                /* The writers required flush position is before or equal
 
962
                                 * to the actual position, so the writer is done...
 
963
                                 */
 
964
#ifdef XT_XLOG_WAIT_SPINS
 
965
                                xt_writing = 0;
 
966
                                if (xt_waiting)
 
967
                                        xt_cond_wakeall(&xl_write_cond);
 
968
#else
 
969
                                xt_writing = FALSE;
 
970
                                xt_cond_wakeall(&xl_write_cond);
 
971
#endif
 
972
                                ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
 
973
                                return OK;
 
974
                        }
 
975
                        /* Not flushed, but what about written? */
 
976
                        if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0) {
 
977
                                /* The write position is after or equal to the required flush
 
978
                                 * position. This means that all we have to do is flush
 
979
                                 * to satisfy the writers condition.
 
980
                                 */
 
981
                                xtBool ok = TRUE;
 
982
 
 
983
                                if (xl_log_id != xl_write_log_id)
 
984
                                        ok = xlog_open_log(xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start), thread);
 
985
 
 
986
                                if (ok) {
 
987
                                        if (xl_db->db_co_busy) {
 
988
                                                /* [(8)] Flush the compactor log. */
 
989
                                                xt_lock_mutex_ns(&xl_db->db_co_dlog_lock);
 
990
                                                ok = xl_db->db_co_thread->st_dlog_buf.dlb_flush_log(TRUE, thread);
 
991
                                                xt_unlock_mutex_ns(&xl_db->db_co_dlog_lock);
 
992
                                        }
 
993
                                }
 
994
 
 
995
                                if (ok) {
 
996
                                        flush_time = thread->st_statistics.st_xlog.ts_flush_time;
 
997
                                        if ((ok = xt_flush_file(xl_log_file, &thread->st_statistics.st_xlog, thread))) {
 
998
                                                xl_last_flush_time = (u_int) (thread->st_statistics.st_xlog.ts_flush_time - flush_time);
 
999
                                                xl_log_bytes_flushed = xl_log_bytes_written;
 
1000
 
 
1001
                                                xt_lock_mutex_ns(&xl_db->db_wr_lock);
 
1002
                                                xl_flush_log_id = xl_write_log_id;
 
1003
                                                xl_flush_log_offset = xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start);
 
1004
                                                /*
 
1005
                                                 * We have written data to the log, wake the writer to commit
 
1006
                                                * the data to the database.
 
1007
                                                */
 
1008
                                                xlog_wr_log_written(xl_db);
 
1009
                                                xt_unlock_mutex_ns(&xl_db->db_wr_lock);
 
1010
                                        }
 
1011
                                }
 
1012
#ifdef XT_XLOG_WAIT_SPINS
 
1013
                                xt_writing = 0;
 
1014
                                if (xt_waiting)
 
1015
                                        xt_cond_wakeall(&xl_write_cond);
 
1016
#else
 
1017
                                xt_writing = FALSE;
 
1018
                                xt_cond_wakeall(&xl_write_cond);
 
1019
#endif
 
1020
                                ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
 
1021
                                return ok;
 
1022
                        }
 
1023
                }
 
1024
                else if (size1) {
 
1025
                        /* If the amounf of data to be written is 0, then we are just required
 
1026
                         * to write the transaction buffer.
 
1027
                         *
 
1028
                         * If there is space in the buffer, then we can go on
 
1029
                         * to copy our data into the buffer:
 
1030
                         */
 
1031
                        if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers) {
 
1032
#ifdef XT_XLOG_WAIT_SPINS
 
1033
                                xt_writing = 0;
 
1034
                                if (xt_waiting)
 
1035
                                        xt_cond_wakeall(&xl_write_cond);
 
1036
#else
 
1037
                                xt_writing = FALSE;
 
1038
                                xt_cond_wakeall(&xl_write_cond);
 
1039
#endif
 
1040
                                goto copy_to_log_buffer;
 
1041
                        }
 
1042
                }
 
1043
                else {
 
1044
                        /* We are just writing the buffer! */
 
1045
                        ASSERT_NS(write_reason == WR_NO_SPACE);
 
1046
                        if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0) {
 
1047
#ifdef XT_XLOG_WAIT_SPINS
 
1048
                                xt_writing = 0;
 
1049
                                if (xt_waiting)
 
1050
                                        xt_cond_wakeall(&xl_write_cond);
 
1051
#else
 
1052
                                xt_writing = FALSE;
 
1053
                                xt_cond_wakeall(&xl_write_cond);
 
1054
#endif
 
1055
                                return OK;
 
1056
                        }
 
1057
                }
 
1058
 
 
1059
                rewrite:
 
1060
                /* If the current write buffer has been written, then
 
1061
                 * switch the logs. Otherwise we must try to existing
 
1062
                 * write buffer.
 
1063
                 */
 
1064
                if (xl_write_done) {
 
1065
                        /* This means that the current write buffer has been writen,
 
1066
                         * i.e. it is empty!
 
1067
                         */
 
1068
                        xt_spinlock_lock(&xl_buffer_lock);
 
1069
                        xtWord1 *tmp_buffer = xl_write_buffer;
 
1070
 
 
1071
                        /* The write position is now the append position: */
 
1072
                        xl_write_log_id = xl_append_log_id;
 
1073
                        xl_write_log_offset = xl_append_log_offset;
 
1074
                        xl_write_buf_pos = xl_append_buf_pos;
 
1075
                        xl_write_buf_pos_start = xl_append_buf_pos_start;
 
1076
                        xl_write_buffer = xl_append_buffer;
 
1077
                        xl_write_done = FALSE;
 
1078
 
 
1079
                        /* We have to maintain 512 byte alignment: */
 
1080
                        ASSERT_NS((xl_write_log_offset % 512) == 0);
 
1081
                        part_size = xl_write_buf_pos % 512;
 
1082
                        if (part_size != 0)
 
1083
                                memcpy(tmp_buffer, xl_write_buffer + xl_write_buf_pos - part_size, part_size);
 
1084
 
 
1085
                        /* The new append position will be after the
 
1086
                         * current append position:
 
1087
                         */
 
1088
                        xl_append_log_offset += xl_append_buf_pos - part_size;
 
1089
                        xl_append_buf_pos = part_size;
 
1090
                        xl_append_buf_pos_start = part_size;
 
1091
                        xl_append_buffer = tmp_buffer; // The old write buffer (which is empty)
 
1092
 
 
1093
                        /*
 
1094
                         * If the append offset exceeds the log threshhold, then
 
1095
                         * we set the append buffer to a new log file:
 
1096
                         *
 
1097
                         * NOTE: This algorithm will cause the log to be overwriten by a maximum
 
1098
                         * of the log buffer size!
 
1099
                         */
 
1100
                        if (xl_append_log_offset >= xl_log_file_threshold) {
 
1101
                                XTXactNewLogEntryDPtr   log_tail;
 
1102
                                XTXactLogHeaderDPtr             log_head;
 
1103
 
 
1104
                                xl_append_log_id++;
 
1105
 
 
1106
                                /* Write the final record to the old log.
 
1107
                                 * There is enough space for this because we allocate the
 
1108
                                 * buffer a little bigger than required.
 
1109
                                 */
 
1110
                                log_tail = (XTXactNewLogEntryDPtr) (xl_write_buffer + xl_write_buf_pos);
 
1111
                                log_tail->xl_status_1 = XT_LOG_ENT_NEW_LOG;
 
1112
                                log_tail->xl_checksum_1 = XT_CHECKSUM_1(xl_append_log_id) ^ XT_CHECKSUM_1(xl_write_log_id);
 
1113
                                XT_SET_DISK_4(log_tail->xl_log_id_4, xl_append_log_id);
 
1114
                                xl_write_buf_pos += sizeof(XTXactNewLogEntryDRec);
 
1115
 
 
1116
                                /* We add the header to the next log. */
 
1117
                                log_head = (XTXactLogHeaderDPtr) xl_append_buffer;
 
1118
                                memset(log_head, 0, sizeof(XTXactLogHeaderDRec));
 
1119
                                log_head->xh_status_1 = XT_LOG_ENT_HEADER;
 
1120
                                log_head->xh_checksum_1 = XT_CHECKSUM_1(xl_append_log_id);
 
1121
                                XT_SET_DISK_4(log_head->xh_size_4, sizeof(XTXactLogHeaderDRec));
 
1122
                                XT_SET_DISK_4(log_head->xh_log_id_4, xl_append_log_id);
 
1123
                                XT_SET_DISK_2(log_head->xh_version_2, XT_LOG_VERSION_NO);
 
1124
                                XT_SET_DISK_4(log_head->xh_magic_4, XT_LOG_FILE_MAGIC);
 
1125
 
 
1126
                                xl_append_log_offset = 0;
 
1127
                                xl_append_buf_pos = sizeof(XTXactLogHeaderDRec);
 
1128
                                xl_append_buf_pos_start = 0;
 
1129
                        }
 
1130
                        xt_spinlock_unlock(&xl_buffer_lock);
 
1131
                        /* We have completed the switch. The append buffer is empty, and
 
1132
                         * other threads can begin to write to it.
 
1133
                         *
 
1134
                         * Meanwhile, this thread will write the write buffer...
 
1135
                         */
 
1136
                }
 
1137
 
 
1138
                /* Make sure we have the correct log open: */
 
1139
                if (xl_log_id != xl_write_log_id) {
 
1140
                        if (!xlog_open_log(xl_write_log_id, xl_write_log_offset, thread))
 
1141
                                goto write_failed;
 
1142
                }
 
1143
 
 
1144
                /* Write the buffer. */
 
1145
                /* Always write an integral number of 512 byte blocks: */
 
1146
                ASSERT_NS((xl_write_log_offset % 512) == 0);
 
1147
                if ((part_size = xl_write_buf_pos % 512)) {
 
1148
                        part_size = 512 - part_size;
 
1149
                        xl_write_buffer[xl_write_buf_pos] = XT_LOG_ENT_END_OF_LOG;
 
1150
#ifdef HAVE_valgrind
 
1151
                        if (part_size > 1)
 
1152
                                memset(xl_write_buffer + xl_write_buf_pos + 1, 0x66, part_size - 1);
 
1153
#endif
 
1154
                        if (!xt_pwrite_file(xl_log_file, xl_write_log_offset, xl_write_buf_pos+part_size, xl_write_buffer, &thread->st_statistics.st_xlog, thread))
 
1155
                                goto write_failed;                      
 
1156
                }
 
1157
                else {
 
1158
                        if (!xt_pwrite_file(xl_log_file, xl_write_log_offset, xl_write_buf_pos, xl_write_buffer, &thread->st_statistics.st_xlog, thread))
 
1159
                                goto write_failed;
 
1160
                }
 
1161
 
 
1162
                /* This part has not been written: */
 
1163
                part_size = xl_write_buf_pos - xl_write_buf_pos_start;
 
1164
 
 
1165
                /* We have written the data to the log, transfer
 
1166
                 * the buffer data into the cache. */
 
1167
                if (!xlog_transfer_to_cache(xl_log_file, xl_log_id, xl_write_log_offset+xl_write_buf_pos_start, part_size, xl_write_buffer+xl_write_buf_pos_start, thread))
 
1168
                        goto write_failed;
 
1169
 
 
1170
                xl_write_done = TRUE;
 
1171
                xl_log_bytes_written += part_size;
 
1172
 
 
1173
                if (write_reason == WR_FLUSH) {
 
1174
                        if (xl_db->db_co_busy) {
 
1175
                                /* [(8)] Flush the compactor log. */
 
1176
                                xt_lock_mutex_ns(&xl_db->db_co_dlog_lock);
 
1177
                                if (!xl_db->db_co_thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
 
1178
                                        xl_log_bytes_written -= part_size;
 
1179
                                        xt_unlock_mutex_ns(&xl_db->db_co_dlog_lock);
 
1180
                                        goto write_failed;
 
1181
                                }
 
1182
                                xt_unlock_mutex_ns(&xl_db->db_co_dlog_lock);
 
1183
                        }
 
1184
 
 
1185
                        /* And flush if required: */
 
1186
                        flush_time = thread->st_statistics.st_xlog.ts_flush_time;
 
1187
                        if (!xt_flush_file(xl_log_file, &thread->st_statistics.st_xlog, thread)) {
 
1188
                                xl_log_bytes_written -= part_size;
 
1189
                                goto write_failed;
 
1190
                        }
 
1191
                        xl_last_flush_time = (u_int) (thread->st_statistics.st_xlog.ts_flush_time - flush_time);
 
1192
 
 
1193
                        xl_log_bytes_flushed = xl_log_bytes_written;
 
1194
 
 
1195
                        xt_lock_mutex_ns(&xl_db->db_wr_lock);
 
1196
                        xl_flush_log_id = xl_write_log_id;
 
1197
                        xl_flush_log_offset = xl_write_log_offset + xl_write_buf_pos;
 
1198
                        /*
 
1199
                         * We have written data to the log, wake the writer to commit
 
1200
                         * the data to the database.
 
1201
                         */
 
1202
                        xlog_wr_log_written(xl_db);
 
1203
                        xt_unlock_mutex_ns(&xl_db->db_wr_lock);
 
1204
 
 
1205
                        /* Check that the require flush condition has arrived. */
 
1206
                        if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) > 0)
 
1207
                                /* The required position is still after the current flush
 
1208
                                 * position, continue writing: */
 
1209
                                goto rewrite;
 
1210
 
 
1211
#ifdef XT_XLOG_WAIT_SPINS
 
1212
                        xt_writing = 0;
 
1213
                        if (xt_waiting)
 
1214
                                xt_cond_wakeall(&xl_write_cond);
 
1215
#else
 
1216
                        xt_writing = FALSE;
 
1217
                        xt_cond_wakeall(&xl_write_cond);
 
1218
#endif
 
1219
                        ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
 
1220
                        return OK;
 
1221
                }
 
1222
                else
 
1223
                        xlog_wr_log_written(xl_db);
 
1224
 
 
1225
                /*
 
1226
                 * Check that the buffer is now available, otherwise,
 
1227
                 * switch and write again!
 
1228
                 */
 
1229
                if (xl_append_buf_pos + size1 + size2 > xl_size_of_buffers)
 
1230
                        goto rewrite;
 
1231
 
 
1232
#ifdef XT_XLOG_WAIT_SPINS
 
1233
                xt_writing = 0;
 
1234
                if (xt_waiting)
 
1235
                        xt_cond_wakeall(&xl_write_cond);
 
1236
#else
 
1237
                xt_writing = FALSE;
 
1238
                xt_cond_wakeall(&xl_write_cond);
 
1239
#endif
 
1240
 
 
1241
                if (size1 == 0)
 
1242
                        return OK;
 
1243
        }
 
1244
 
 
1245
        copy_to_log_buffer:
 
1246
        ASSERT_NS(size1);
 
1247
        xt_spinlock_lock(&xl_buffer_lock);
 
1248
        /* Now we have to check again. The check above was a dirty read!
 
1249
         */
 
1250
        if (xl_append_buf_pos + size1 + size2 > xl_size_of_buffers) {
 
1251
                xt_spinlock_unlock(&xl_buffer_lock);
 
1252
                /* Not enough space, write the buffer, and return here. */
 
1253
                write_reason = WR_NO_SPACE;
 
1254
                goto write_log_to_file;
 
1255
        }
 
1256
 
 
1257
        memcpy(xl_append_buffer + xl_append_buf_pos, data1, size1);
 
1258
        if (size2)
 
1259
                memcpy(xl_append_buffer + xl_append_buf_pos + size1, data2, size2);
 
1260
        /* Add the log ID to the checksum!
 
1261
         * This is required because log files are re-used, and we don't
 
1262
         * want the records to be valid when the log is re-used.
 
1263
         */
 
1264
        register XTXactLogBufferDPtr record;
 
1265
 
 
1266
        /*
 
1267
         * Adjust db_xn_writer_count here. It is protected by
 
1268
         * xl_buffer_lock.
 
1269
         */
 
1270
        record = (XTXactLogBufferDPtr) (xl_append_buffer + xl_append_buf_pos);
 
1271
        switch (record->xh.xh_status_1) {
 
1272
                case XT_LOG_ENT_HEADER:
 
1273
                case XT_LOG_ENT_END_OF_LOG:
 
1274
                        break;
 
1275
                case XT_LOG_ENT_REC_MODIFIED:
 
1276
                case XT_LOG_ENT_UPDATE:
 
1277
                case XT_LOG_ENT_UPDATE_BG:
 
1278
                case XT_LOG_ENT_UPDATE_FL:
 
1279
                case XT_LOG_ENT_UPDATE_FL_BG:
 
1280
                case XT_LOG_ENT_INSERT:
 
1281
                case XT_LOG_ENT_INSERT_BG:
 
1282
                case XT_LOG_ENT_INSERT_FL:
 
1283
                case XT_LOG_ENT_INSERT_FL_BG:
 
1284
                case XT_LOG_ENT_DELETE:
 
1285
                case XT_LOG_ENT_DELETE_BG:
 
1286
                case XT_LOG_ENT_DELETE_FL:
 
1287
                case XT_LOG_ENT_DELETE_FL_BG:
 
1288
                        sum = XT_GET_DISK_2(record->xu.xu_checksum_2) ^ XT_CHECKSUM_2(xl_append_log_id);
 
1289
                        XT_SET_DISK_2(record->xu.xu_checksum_2, sum);
 
1290
 
 
1291
                        if (!thread->st_xact_writer) {
 
1292
                                thread->st_xact_writer = TRUE;
 
1293
                                thread->st_xact_write_time = xt_db_approximate_time;
 
1294
                                xl_db->db_xn_writer_count++;
 
1295
                                xl_db->db_xn_total_writer_count++;
 
1296
                        }
 
1297
                        break;
 
1298
                case XT_LOG_ENT_REC_REMOVED_BI:
 
1299
                case XT_LOG_ENT_REC_REMOVED_BI_L:
 
1300
                        sum = XT_GET_DISK_2(record->rb.rb_checksum_2) ^ XT_CHECKSUM_2(xl_append_log_id);
 
1301
                        XT_SET_DISK_2(record->rb.rb_checksum_2, sum);
 
1302
                        break;
 
1303
                case XT_LOG_ENT_ROW_NEW:
 
1304
                case XT_LOG_ENT_ROW_NEW_FL:
 
1305
                        record->xl.xl_checksum_1 ^= XT_CHECKSUM_1(xl_append_log_id);
 
1306
 
 
1307
                        if (!thread->st_xact_writer) {
 
1308
                                thread->st_xact_writer = TRUE;
 
1309
                                thread->st_xact_write_time = xt_db_approximate_time;
 
1310
                                xl_db->db_xn_writer_count++;
 
1311
                                xl_db->db_xn_total_writer_count++;
 
1312
                        }
 
1313
                        break;
 
1314
                case XT_LOG_ENT_COMMIT:
 
1315
                case XT_LOG_ENT_ABORT:
 
1316
                        ASSERT_NS(thread->st_xact_writer);
 
1317
                        ASSERT_NS(xl_db->db_xn_writer_count > 0);
 
1318
                        if (thread->st_xact_writer) {
 
1319
                                xl_db->db_xn_writer_count--;
 
1320
                                thread->st_xact_writer = FALSE;
 
1321
                                if (thread->st_xact_long_running) {
 
1322
                                        xl_db->db_xn_long_running_count--;
 
1323
                                        thread->st_xact_long_running = FALSE;
 
1324
                                }
 
1325
                        }
 
1326
                        /* No break required! */
 
1327
                default:
 
1328
                        record->xl.xl_checksum_1 ^= XT_CHECKSUM_1(xl_append_log_id);
 
1329
                        break;
 
1330
        }
 
1331
#ifdef DEBUG
 
1332
        ASSERT_NS(xlog_verify(record, size1 + size2, xl_append_log_id));
 
1333
#endif
 
1334
        if (log_id)
 
1335
                *log_id = xl_append_log_id;
 
1336
        if (log_offset)
 
1337
                *log_offset = xl_append_log_offset + xl_append_buf_pos;
 
1338
        xl_append_buf_pos += size1 + size2;
 
1339
        if (flush_log_at_trx_commit != XT_XLOG_NO_WRITE_NO_FLUSH) {
 
1340
                write_reason = flush_log_at_trx_commit == XT_XLOG_WRITE_AND_FLUSH ? WR_FLUSH : WR_NO_SPACE;
 
1341
                req_flush_log_id = xl_append_log_id;
 
1342
                req_flush_log_offset = xl_append_log_offset + xl_append_buf_pos;
 
1343
                xt_spinlock_unlock(&xl_buffer_lock);
 
1344
                /* We have written the data already! */
 
1345
                size1 = 0;
 
1346
                size2 = 0;
 
1347
                goto write_log_to_file;
 
1348
        }
 
1349
 
 
1350
        // Failed sometime when outside the spinlock!
 
1351
        ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset + xl_append_buf_pos) <= 0); 
 
1352
        xt_spinlock_unlock(&xl_buffer_lock);
 
1353
 
 
1354
        return OK;
 
1355
 
 
1356
        write_failed:
 
1357
#ifdef XT_XLOG_WAIT_SPINS
 
1358
        xt_writing = 0;
 
1359
        if (xt_waiting)
 
1360
                xt_cond_wakeall(&xl_write_cond);
 
1361
#else
 
1362
        xt_writing = FALSE;
 
1363
        xt_cond_wakeall(&xl_write_cond);
 
1364
#endif
 
1365
        return FAILED;
 
1366
}
 
1367
 
 
1368
/*
 
1369
 * This function does not always delete the log. It may just rename a
 
1370
 * log to a new log which it will need.
 
1371
 * This speeds things up:
 
1372
 *
 
1373
 * - No need to pre-allocate the new log.
 
1374
 * - Log data is already flushed (i.e. disk blocks allocated)
 
1375
 * - Log is already in OS cache.
 
1376
 *
 
1377
 * However, it means that I need to checksum things differently
 
1378
 * on each log to make sure I do not treat an old record
 
1379
 * as valid!
 
1380
 *
 
1381
 * Return OK, FAILED or XT_ERR
 
1382
 */ 
 
1383
int XTDatabaseLog::xlog_delete_log(xtLogID del_log_id, XTThreadPtr thread)
 
1384
{
 
1385
        char    path[PATH_MAX];
 
1386
 
 
1387
        if (xl_max_log_id < xl_write_log_id)
 
1388
                xl_max_log_id = xl_write_log_id;
 
1389
 
 
1390
        xlog_name(PATH_MAX, path, del_log_id);
 
1391
 
 
1392
        if (xt_db_offline_log_function == XT_RECYCLE_LOGS) {
 
1393
                char    new_path[PATH_MAX];
 
1394
                xtLogID new_log_id;
 
1395
                xtBool  ok;
 
1396
 
 
1397
                /* Make sure that the total logs is less than or equal to the log file count
 
1398
                 * (plus dynamic component).
 
1399
                 */
 
1400
                while (xl_max_log_id - del_log_id + 1 <= (xl_log_file_count + xt_log_file_dyn_count) &&
 
1401
                        /* And the number of logs after the current log (including the current log)
 
1402
                         * must be less or equal to the log file count. */
 
1403
                        xl_max_log_id - xl_write_log_id + 1 <= xl_log_file_count) {
 
1404
                        new_log_id = xl_max_log_id+1;
 
1405
                        xlog_name(PATH_MAX, new_path, new_log_id);
 
1406
                        ok = xt_fs_rename(NULL, path, new_path);
 
1407
                        if (ok) {
 
1408
                                xl_max_log_id = new_log_id;
 
1409
                                goto done;
 
1410
                        }
 
1411
                        if (!xt_fs_exists(new_path)) {
 
1412
                                /* Try again later: */
 
1413
                                if (thread->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
 
1414
                                        XT_FILE_IN_USE(thread->t_exception.e_sys_err))
 
1415
                                        return FAILED;
 
1416
 
 
1417
                                return XT_ERR;
 
1418
                        }
 
1419
                        xl_max_log_id = new_log_id;
 
1420
                }
 
1421
        }
 
1422
 
 
1423
        if (xt_db_offline_log_function != XT_KEEP_LOGS) {
 
1424
                if (!xt_fs_delete(NULL, path)) {
 
1425
                        if (thread->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
 
1426
                                XT_FILE_IN_USE(thread->t_exception.e_sys_err))
 
1427
                                return FAILED;
 
1428
 
 
1429
                        return XT_ERR;
 
1430
                }
 
1431
        }
 
1432
 
 
1433
        done:
 
1434
        return OK;
 
1435
}
 
1436
 
 
1437
/* PRIVATE FUNCTIONS */
 
1438
xtBool XTDatabaseLog::xlog_open_log(xtLogID log_id, off_t curr_write_pos, XTThreadPtr thread)
 
1439
{
 
1440
        char    log_path[PATH_MAX];
 
1441
        off_t   eof;
 
1442
 
 
1443
        if (xl_log_id == log_id)
 
1444
                return OK;
 
1445
 
 
1446
        if (xl_log_file) {
 
1447
                if (!xt_flush_file(xl_log_file, &thread->st_statistics.st_xlog, thread))
 
1448
                        return FAILED;
 
1449
                xt_close_file_ns(xl_log_file);
 
1450
                xl_log_file = NULL;
 
1451
                xl_log_id = 0;
 
1452
        }
 
1453
 
 
1454
        xlog_name(PATH_MAX, log_path, log_id);
 
1455
        if (!(xl_log_file = xt_open_file_ns(log_path, XT_FT_STANDARD, XT_FS_CREATE | XT_FS_MAKE_PATH, 16*1024*1024)))
 
1456
                return FAILED;
 
1457
        /* Allocate space until the required size: */
 
1458
        if (curr_write_pos <  xl_log_file_threshold) {
 
1459
                eof = xt_seek_eof_file(NULL, xl_log_file);
 
1460
                if (eof == 0) {
 
1461
                        /* A new file (bad), we need a greater file count: */
 
1462
                        xt_log_file_dyn_count++;
 
1463
                        xt_log_file_dyn_dec = 4;
 
1464
                }
 
1465
                else {
 
1466
                        /* An existing file (good): */
 
1467
                        if (xt_log_file_dyn_count > 0) {
 
1468
                                if (xt_log_file_dyn_dec > 0)
 
1469
                                        xt_log_file_dyn_dec--;
 
1470
                                else
 
1471
                                        xt_log_file_dyn_count--;
 
1472
                        }
 
1473
                }
 
1474
                if (eof < xl_log_file_threshold) {
 
1475
                        char    buffer[2048];
 
1476
                        size_t  tfer;
 
1477
 
 
1478
                        memset(buffer, 0, 2048);
 
1479
 
 
1480
                        curr_write_pos = xt_align_offset(curr_write_pos, 512);
 
1481
#ifdef PREWRITE_LOG_COMPLETELY
 
1482
                        while (curr_write_pos < xl_log_file_threshold) {
 
1483
                                tfer = 2048;
 
1484
                                if ((off_t) tfer > xl_log_file_threshold - curr_write_pos)
 
1485
                                        tfer = (size_t) (xl_log_file_threshold - curr_write_pos);
 
1486
                                if (curr_write_pos == 0)
 
1487
                                        *buffer = XT_LOG_ENT_END_OF_LOG;
 
1488
                                if (!xt_pwrite_file(xl_log_file, curr_write_pos, tfer, buffer, &thread->st_statistics.st_xlog, thread))
 
1489
                                        return FAILED;
 
1490
                                *buffer = 0;
 
1491
                                curr_write_pos += tfer;
 
1492
                        }
 
1493
#else
 
1494
                        if (curr_write_pos < xl_log_file_threshold) {
 
1495
                                tfer = 2048;
 
1496
                                
 
1497
                                if (curr_write_pos < xl_log_file_threshold - 2048)
 
1498
                                        curr_write_pos = xl_log_file_threshold - 2048;
 
1499
                                if ((off_t) tfer > xl_log_file_threshold - curr_write_pos)
 
1500
                                        tfer = (size_t) (xl_log_file_threshold - curr_write_pos);
 
1501
                                if (!xt_pwrite_file(xl_log_file, curr_write_pos, tfer, buffer, &thread->st_statistics.st_xlog, thread))
 
1502
                                        return FAILED;
 
1503
                        }
 
1504
#endif
 
1505
                }
 
1506
                else if (eof > xl_log_file_threshold + (128 * 1024 * 1024)) {
 
1507
                        if (!xt_set_eof_file(NULL, xl_log_file, xl_log_file_threshold))
 
1508
                                return FAILED;
 
1509
                }
 
1510
        }
 
1511
        xl_log_id = log_id;
 
1512
        return OK;
 
1513
}
 
1514
 
 
1515
void XTDatabaseLog::xlog_name(size_t size, char *path, xtLogID log_id)
 
1516
{
 
1517
        char name[50];
 
1518
 
 
1519
        sprintf(name, "xlog-%lu.xt", (u_long) log_id);
 
1520
        xt_strcpy(size, path, xl_db->db_main_path);
 
1521
        xt_add_system_dir(size, path);
 
1522
        xt_add_dir_char(size, path);
 
1523
        xt_strcat(size, path, name);
 
1524
}
 
1525
 
 
1526
/*
 
1527
 * -----------------------------------------------------------------------
 
1528
 * T H R E A D   T R A N S A C T I O N   B U F F E R
 
1529
 */
 
1530
 
 
1531
xtPublic xtBool xt_xlog_flush_log(struct XTDatabase *db, XTThreadPtr thread)
 
1532
{
 
1533
        return db->db_xlog.xlog_flush(thread);
 
1534
}
 
1535
 
 
1536
xtPublic xtBool xt_xlog_log_data(XTThreadPtr thread, size_t size, XTXactLogBufferDPtr log_entry, int flush_log_at_trx_commit)
 
1537
{
 
1538
        return thread->st_database->db_xlog.xlog_append(thread, size, (xtWord1 *) log_entry, 0, NULL, flush_log_at_trx_commit, NULL, NULL);
 
1539
}
 
1540
 
 
1541
/* Allocate a record from the free list. */
 
1542
xtPublic xtBool xt_xlog_modify_table(xtTableID tab_id, u_int status, xtOpSeqNo op_seq, xtWord1 new_rec_type, xtRecordID free_rec_id, xtRecordID rec_id, size_t size, xtWord1 *data, XTThreadPtr thread)
 
1543
{
 
1544
        XTXactLogBufferDRec     log_entry;
 
1545
        size_t                          len;
 
1546
        xtWord4                         sum = 0;
 
1547
        int                                     check_size = 1;
 
1548
        XTXactDataPtr           xact = NULL;
 
1549
        int                                     flush_log_at_trx_commit = XT_XLOG_NO_WRITE_NO_FLUSH;
 
1550
 
 
1551
        switch (status) {
 
1552
                case XT_LOG_ENT_REC_MODIFIED:
 
1553
                case XT_LOG_ENT_UPDATE:
 
1554
                case XT_LOG_ENT_INSERT:
 
1555
                case XT_LOG_ENT_DELETE:
 
1556
                        check_size = 2;
 
1557
                        XT_SET_DISK_4(log_entry.xu.xu_op_seq_4, op_seq);
 
1558
                        XT_SET_DISK_4(log_entry.xu.xu_tab_id_4, tab_id);
 
1559
                        XT_SET_DISK_4(log_entry.xu.xu_rec_id_4, rec_id);
 
1560
                        XT_SET_DISK_2(log_entry.xu.xu_size_2, size);
 
1561
                        len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1);
 
1562
                        if (!(thread->st_xact_data->xd_flags & XT_XN_XAC_LOGGED)) {
 
1563
                                /* Add _BG: */
 
1564
                                status++;
 
1565
                                xact = thread->st_xact_data;
 
1566
                                xact->xd_flags |= XT_XN_XAC_LOGGED;
 
1567
                        }
 
1568
                        break;
 
1569
                case XT_LOG_ENT_UPDATE_FL:
 
1570
                case XT_LOG_ENT_INSERT_FL:
 
1571
                case XT_LOG_ENT_DELETE_FL:
 
1572
                        check_size = 2;
 
1573
                        XT_SET_DISK_4(log_entry.xf.xf_op_seq_4, op_seq);
 
1574
                        XT_SET_DISK_4(log_entry.xf.xf_tab_id_4, tab_id);
 
1575
                        XT_SET_DISK_4(log_entry.xf.xf_rec_id_4, rec_id);
 
1576
                        XT_SET_DISK_2(log_entry.xf.xf_size_2, size);
 
1577
                        XT_SET_DISK_4(log_entry.xf.xf_free_rec_id_4, free_rec_id);
 
1578
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
 
1579
                        len = offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1);
 
1580
                        if (!(thread->st_xact_data->xd_flags & XT_XN_XAC_LOGGED)) {
 
1581
                                /* Add _BG: */
 
1582
                                status++;
 
1583
                                xact = thread->st_xact_data;
 
1584
                                xact->xd_flags |= XT_XN_XAC_LOGGED;
 
1585
                        }
 
1586
                        break;
 
1587
                case XT_DEFUNKT_REC_FREED:
 
1588
                case XT_DEFUNKT_REC_REMOVED:
 
1589
                case XT_DEFUNKT_REC_REMOVED_EXT:
 
1590
                        ASSERT_NS(size == 1 + XT_XACT_ID_SIZE + sizeof(XTTabRecFreeDRec));
 
1591
                        XT_SET_DISK_4(log_entry.fr.fr_op_seq_4, op_seq);
 
1592
                        XT_SET_DISK_4(log_entry.fr.fr_tab_id_4, tab_id);
 
1593
                        XT_SET_DISK_4(log_entry.fr.fr_rec_id_4, rec_id);
 
1594
                        len = offsetof(XTactFreeRecEntryDRec, fr_stat_id_1);
 
1595
                        break;
 
1596
                case XT_LOG_ENT_REC_REMOVED_BI:
 
1597
                        check_size = 2;
 
1598
                        XT_SET_DISK_4(log_entry.rb.rb_op_seq_4, op_seq);
 
1599
                        XT_SET_DISK_4(log_entry.rb.rb_tab_id_4, tab_id);
 
1600
                        XT_SET_DISK_4(log_entry.rb.rb_rec_id_4, rec_id);
 
1601
                        XT_SET_DISK_2(log_entry.rb.rb_size_2, size);
 
1602
                        log_entry.rb.rb_new_rec_type_1 = new_rec_type;
 
1603
                        sum ^= XT_CHECKSUM4_REC((xtWord4) new_rec_type);
 
1604
                        len = offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1);
 
1605
                        break;
 
1606
                case XT_LOG_ENT_REC_REMOVED_BI_L:
 
1607
                        check_size = 2;
 
1608
                        XT_SET_DISK_4(log_entry.bl.bl_op_seq_4, op_seq);
 
1609
                        XT_SET_DISK_4(log_entry.bl.bl_tab_id_4, tab_id);
 
1610
                        XT_SET_DISK_4(log_entry.bl.bl_rec_id_4, rec_id);
 
1611
                        XT_SET_DISK_2(log_entry.bl.bl_size_2, size);
 
1612
                        XT_SET_DISK_4(log_entry.bl.bl_prev_rec_id_4, free_rec_id);
 
1613
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
 
1614
                        log_entry.bl.bl_new_rec_type_1 = new_rec_type;
 
1615
                        sum ^= XT_CHECKSUM4_REC((xtWord4) new_rec_type);
 
1616
                        len = offsetof(XTactRemoveBILEntryDRec, bl_rec_type_1);
 
1617
                        break;
 
1618
                case XT_LOG_ENT_REC_MOVED:
 
1619
                        ASSERT_NS(size == 8);
 
1620
                        XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
 
1621
                        XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
 
1622
                        XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
 
1623
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
 
1624
                        break;
 
1625
                case XT_LOG_ENT_REC_CLEANED:
 
1626
                        ASSERT_NS(size == offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE);
 
1627
                        XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
 
1628
                        XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
 
1629
                        XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
 
1630
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
 
1631
                        break;
 
1632
                case XT_LOG_ENT_REC_CLEANED_1:
 
1633
                        ASSERT_NS(size == 1);
 
1634
                        XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
 
1635
                        XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
 
1636
                        XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
 
1637
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
 
1638
                        break;
 
1639
                case XT_LOG_ENT_REC_UNLINKED:
 
1640
                        ASSERT_NS(size == offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE);
 
1641
                        XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
 
1642
                        XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
 
1643
                        XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
 
1644
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
 
1645
                        break;
 
1646
                case XT_LOG_ENT_ROW_NEW:
 
1647
                        ASSERT_NS(size == 0);
 
1648
                        XT_SET_DISK_4(log_entry.xa.xa_op_seq_4, op_seq);
 
1649
                        XT_SET_DISK_4(log_entry.xa.xa_tab_id_4, tab_id);
 
1650
                        XT_SET_DISK_4(log_entry.xa.xa_row_id_4, rec_id);
 
1651
                        len = offsetof(XTactRowAddedEntryDRec, xa_row_id_4) + XT_ROW_ID_SIZE;
 
1652
                        break;
 
1653
                case XT_LOG_ENT_ROW_NEW_FL:
 
1654
                        ASSERT_NS(size == 0);
 
1655
                        XT_SET_DISK_4(log_entry.xa.xa_op_seq_4, op_seq);
 
1656
                        XT_SET_DISK_4(log_entry.xa.xa_tab_id_4, tab_id);
 
1657
                        XT_SET_DISK_4(log_entry.xa.xa_row_id_4, rec_id);
 
1658
                        XT_SET_DISK_4(log_entry.xa.xa_free_list_4, free_rec_id);
 
1659
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
 
1660
                        len = offsetof(XTactRowAddedEntryDRec, xa_free_list_4) + XT_ROW_ID_SIZE;
 
1661
                        break;
 
1662
                case XT_LOG_ENT_ROW_ADD_REC:
 
1663
                case XT_LOG_ENT_ROW_SET:
 
1664
                case XT_LOG_ENT_ROW_FREED:
 
1665
                        ASSERT_NS(size == sizeof(XTTabRowRefDRec));
 
1666
                        XT_SET_DISK_4(log_entry.wr.wr_op_seq_4, op_seq);
 
1667
                        XT_SET_DISK_4(log_entry.wr.wr_tab_id_4, tab_id);
 
1668
                        XT_SET_DISK_4(log_entry.wr.wr_row_id_4, rec_id);
 
1669
                        len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4);
 
1670
                        break;
 
1671
                case XT_LOG_ENT_PREPARE:
 
1672
                        check_size = 2;
 
1673
                        XT_SET_DISK_4(log_entry.xp.xp_xact_id_4, op_seq);
 
1674
                        log_entry.xp.xp_xa_len_1 = (xtWord1) size;
 
1675
                        len = offsetof(XTXactPrepareEntryDRec, xp_xa_data);
 
1676
                        flush_log_at_trx_commit = xt_db_flush_log_at_trx_commit;
 
1677
                        break;
 
1678
                default:
 
1679
                        ASSERT_NS(FALSE);
 
1680
                        len = 0;
 
1681
                        break;
 
1682
        }
 
1683
 
 
1684
        xtWord1 *dptr = data;
 
1685
        xtWord4 g;
 
1686
 
 
1687
        sum ^= op_seq ^ (tab_id << 8) ^ XT_CHECKSUM4_REC(rec_id);
 
1688
        if ((g = sum & 0xF0000000)) {
 
1689
                sum = sum ^ (g >> 24);
 
1690
                sum = sum ^ g;
 
1691
        }
 
1692
        for (u_int i=0; i<(u_int) size; i++) {
 
1693
                sum = (sum << 4) + *dptr;
 
1694
                if ((g = sum & 0xF0000000)) {
 
1695
                        sum = sum ^ (g >> 24);
 
1696
                        sum = sum ^ g;
 
1697
                }
 
1698
                dptr++;
 
1699
        }
 
1700
 
 
1701
        log_entry.xh.xh_status_1 = status;
 
1702
        if (check_size == 1) {
 
1703
                log_entry.xh.xh_checksum_1 = XT_CHECKSUM_1(sum);
 
1704
        }
 
1705
        else {
 
1706
                xtWord2 c;
 
1707
                
 
1708
                c = XT_CHECKSUM_2(sum);
 
1709
                XT_SET_DISK_2(log_entry.xu.xu_checksum_2, c);
 
1710
        }
 
1711
#ifdef PRINT_TABLE_MODIFICATIONS
 
1712
        xt_print_log_record(0, 0, &log_entry);
 
1713
#endif
 
1714
        if (xact)
 
1715
                return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, flush_log_at_trx_commit, &xact->xd_begin_log, &xact->xd_begin_offset);
 
1716
 
 
1717
        return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, flush_log_at_trx_commit, NULL, NULL);
 
1718
}
 
1719
 
 
1720
/*
 
1721
 * -----------------------------------------------------------------------
 
1722
 * S E Q U E N T I A L   L O G   R E A  D I N G
 
1723
 */
 
1724
 
 
1725
/*
 
1726
 * Use the log buffer for sequential reading the log.
 
1727
 */
 
1728
xtBool XTDatabaseLog::xlog_seq_init(XTXactSeqReadPtr seq, size_t buffer_size, xtBool load_cache)
 
1729
{
 
1730
        seq->xseq_buffer_size = buffer_size;
 
1731
        seq->xseq_load_cache = load_cache;
 
1732
 
 
1733
        seq->xseq_log_id = 0;
 
1734
        seq->xseq_log_file = NULL;
 
1735
        seq->xseq_log_eof = 0;
 
1736
 
 
1737
        seq->xseq_buf_log_offset = 0;
 
1738
        seq->xseq_buffer_len = 0;
 
1739
        seq->xseq_buffer = (xtWord1 *) xt_malloc_ns(buffer_size);
 
1740
 
 
1741
        seq->xseq_rec_log_id = 0;
 
1742
        seq->xseq_rec_log_offset = 0;
 
1743
        seq->xseq_record_len = 0;
 
1744
 
 
1745
        return seq->xseq_buffer != NULL;
 
1746
}
 
1747
 
 
1748
void XTDatabaseLog::xlog_seq_exit(XTXactSeqReadPtr seq)
 
1749
{
 
1750
        xlog_seq_close(seq);
 
1751
        if (seq->xseq_buffer) {
 
1752
                xt_free_ns(seq->xseq_buffer);
 
1753
                seq->xseq_buffer = NULL;
 
1754
        }
 
1755
}
 
1756
 
 
1757
void XTDatabaseLog::xlog_seq_close(XTXactSeqReadPtr seq)
 
1758
{
 
1759
        if (seq->xseq_log_file) {
 
1760
                xt_close_file_ns(seq->xseq_log_file);
 
1761
                seq->xseq_log_file = NULL;
 
1762
        }
 
1763
        seq->xseq_log_id = 0;
 
1764
        seq->xseq_log_eof = 0;
 
1765
}
 
1766
 
 
1767
xtBool XTDatabaseLog::xlog_seq_start(XTXactSeqReadPtr seq, xtLogID log_id, xtLogOffset log_offset, xtBool XT_UNUSED(missing_ok))
 
1768
{
 
1769
        if (seq->xseq_rec_log_id != log_id) {
 
1770
                seq->xseq_rec_log_id = log_id;
 
1771
                seq->xseq_buf_log_offset = seq->xseq_rec_log_offset;
 
1772
                seq->xseq_buffer_len = 0;
 
1773
        }
 
1774
 
 
1775
        /* Windows version: this will help to switch
 
1776
         * to the new log file.
 
1777
         * Due to reading from the log buffers, this was
 
1778
         * not always done!
 
1779
         */
 
1780
        if (seq->xseq_log_id != log_id) {
 
1781
                if (seq->xseq_log_file) {
 
1782
                        xt_close_file_ns(seq->xseq_log_file);
 
1783
                        seq->xseq_log_file = NULL;
 
1784
                }
 
1785
        }
 
1786
        seq->xseq_rec_log_offset = log_offset;
 
1787
        seq->xseq_record_len = 0;
 
1788
        return OK;
 
1789
}
 
1790
 
 
1791
size_t XTDatabaseLog::xlog_bytes_to_write()
 
1792
{
 
1793
        xtLogID                                 log_id;
 
1794
        xtLogOffset                             log_offset;
 
1795
        xtLogID                                 to_log_id;
 
1796
        xtLogOffset                             to_log_offset;
 
1797
        size_t                                  byte_count = 0;
 
1798
 
 
1799
        log_id = xl_db->db_wr_log_id;
 
1800
        log_offset = xl_db->db_wr_log_offset;
 
1801
        to_log_id = xl_db->db_xlog.xl_flush_log_id;
 
1802
        to_log_offset = xl_db->db_xlog.xl_flush_log_offset;
 
1803
 
 
1804
        /* Assume the logs have the threshold: */
 
1805
        if (log_id < to_log_id) {
 
1806
                if (log_offset < xt_db_log_file_threshold)
 
1807
                        byte_count = (size_t) (xt_db_log_file_threshold - log_offset);
 
1808
                log_offset = 0;
 
1809
                log_id++;
 
1810
        }
 
1811
        while (log_id < to_log_id) {
 
1812
                byte_count += (size_t) xt_db_log_file_threshold;
 
1813
                log_id++;
 
1814
        }
 
1815
        if (log_offset < to_log_offset)
 
1816
                byte_count += (size_t) (to_log_offset - log_offset);
 
1817
 
 
1818
        return byte_count;
 
1819
}
 
1820
 
 
1821
xtBool XTDatabaseLog::xlog_read_from_cache(XTXactSeqReadPtr seq, xtLogID log_id, xtLogOffset log_offset, size_t size, off_t eof, xtWord1 *buffer, size_t *data_read, XTThreadPtr thread)
 
1822
{
 
1823
        /* xseq_log_file could be NULL because xseq_log_id is not set
 
1824
         * to zero when xseq_log_file is set to NULL!
 
1825
         * This bug caused a crash in TeamDrive.
 
1826
         */
 
1827
        if (seq->xseq_log_id != log_id || !seq->xseq_log_file) {
 
1828
                char path[PATH_MAX];
 
1829
 
 
1830
                if (seq->xseq_log_file) {
 
1831
                        xt_close_file_ns(seq->xseq_log_file);
 
1832
                        seq->xseq_log_file = NULL;
 
1833
                }
 
1834
 
 
1835
                xlog_name(PATH_MAX, path, log_id);
 
1836
                if (!xt_open_file_ns(&seq->xseq_log_file, path, XT_FT_STANDARD, XT_FS_MISSING_OK, 16*1024*1024))
 
1837
                        return FAILED;
 
1838
                if (!seq->xseq_log_file) {
 
1839
                        if (data_read)
 
1840
                                *data_read = 0;
 
1841
                        return OK;
 
1842
                }
 
1843
                seq->xseq_log_id = log_id;
 
1844
                seq->xseq_log_eof = 0;
 
1845
        }
 
1846
 
 
1847
        if (!eof) {
 
1848
                if (!seq->xseq_log_eof)
 
1849
                        seq->xseq_log_eof = xt_seek_eof_file(NULL, seq->xseq_log_file);
 
1850
                eof = seq->xseq_log_eof;
 
1851
        }
 
1852
 
 
1853
        if (log_offset >= eof) {
 
1854
                if (data_read)
 
1855
                        *data_read = 0;
 
1856
                return OK;
 
1857
        }
 
1858
 
 
1859
        if ((off_t) size > eof - log_offset)
 
1860
                size = (size_t) (eof - log_offset);
 
1861
 
 
1862
        if (data_read)
 
1863
                *data_read = size;
 
1864
        return xt_xlog_read(seq->xseq_log_file, seq->xseq_log_id, log_offset, size, buffer, seq->xseq_load_cache, thread);
 
1865
}
 
1866
 
 
1867
xtBool XTDatabaseLog::xlog_rnd_read(XTXactSeqReadPtr seq, xtLogID log_id, xtLogOffset log_offset, size_t size, xtWord1 *buffer, size_t *data_read, XTThreadPtr thread)
 
1868
{
 
1869
        /* Fast track to reading from cache: */
 
1870
        if (log_id < xl_write_log_id)
 
1871
                return xlog_read_from_cache(seq, log_id, log_offset, size, 0, buffer, data_read, thread);
 
1872
        
 
1873
        if (log_id == xl_write_log_id && log_offset + (xtLogOffset) size <= xl_write_log_offset)
 
1874
                return xlog_read_from_cache(seq, log_id, log_offset, size, xl_write_log_offset, buffer, data_read, thread);
 
1875
 
 
1876
        /* May be in the log write or append buffer: */
 
1877
        xt_lck_slock(&xl_buffer_lock);
 
1878
 
 
1879
        if (log_id < xl_write_log_id) {
 
1880
                xt_spinlock_unlock(&xl_buffer_lock);
 
1881
                return xlog_read_from_cache(seq, log_id, log_offset, size, 0, buffer, data_read, thread);
 
1882
        }
 
1883
 
 
1884
        /* Check the write buffer: */
 
1885
        if (log_id == xl_write_log_id) {
 
1886
                if (log_offset + (xtLogOffset) size <= xl_write_log_offset) {
 
1887
                        xt_spinlock_unlock(&xl_buffer_lock);
 
1888
                        return xlog_read_from_cache(seq, log_id, log_offset, size, xl_write_log_offset, buffer, data_read, thread);
 
1889
                }
 
1890
 
 
1891
                if (log_offset < xl_write_log_offset + (xtLogOffset) xl_write_buf_pos) {
 
1892
                        /* Reading partially from the write buffer: */
 
1893
                        if (log_offset >= xl_write_log_offset) {
 
1894
                                /* Completely in the buffer. */
 
1895
                                off_t offset = log_offset - xl_write_log_offset;
 
1896
                                
 
1897
                                if (size > xl_write_buf_pos - offset)
 
1898
                                        size = (size_t) (xl_write_buf_pos - offset);
 
1899
                                
 
1900
                                memcpy(buffer, xl_write_buffer + offset, size);
 
1901
                                if (data_read)
 
1902
                                        *data_read = size;
 
1903
                                goto unlock_and_return;
 
1904
                        }
 
1905
 
 
1906
                        /* End part in the buffer: */
 
1907
                        size_t tfer;
 
1908
                        
 
1909
                        /* The amount that will be taken from the cache: */
 
1910
                        tfer = (size_t) (xl_write_log_offset - log_offset);
 
1911
                        
 
1912
                        size -= tfer;
 
1913
                        if (size > xl_write_buf_pos)
 
1914
                                size = xl_write_buf_pos;
 
1915
                        
 
1916
                        memcpy(buffer + tfer, xl_write_buffer, size);
 
1917
 
 
1918
                        xt_spinlock_unlock(&xl_buffer_lock);
 
1919
                        
 
1920
                        /* Read the first part from the cache: */
 
1921
                        if (data_read)
 
1922
                                *data_read = tfer + size;                       
 
1923
                        return xlog_read_from_cache(seq, log_id, log_offset, tfer, log_offset + tfer, buffer, NULL, thread);
 
1924
                }
 
1925
        }
 
1926
 
 
1927
        /* Check the append buffer: */
 
1928
        if (log_id == xl_append_log_id) {
 
1929
                if (log_offset >= xl_append_log_offset && log_offset < xl_append_log_offset + (xtLogOffset) xl_append_buf_pos) {
 
1930
                        /* It is in the append buffer: */
 
1931
                        size_t offset = (size_t) (log_offset - xl_append_log_offset);
 
1932
                        
 
1933
                        if (size > xl_append_buf_pos - offset)
 
1934
                                size = xl_append_buf_pos - offset;
 
1935
                        
 
1936
                        memcpy(buffer, xl_append_buffer + offset, size);
 
1937
                        if (data_read)
 
1938
                                *data_read = size;
 
1939
                        goto unlock_and_return;
 
1940
                }
 
1941
        }
 
1942
 
 
1943
        if (xl_append_log_id == 0) {
 
1944
                /* This catches the case that
 
1945
                 * the log has not yet been initialized
 
1946
                 * for writing.
 
1947
                 */
 
1948
                xt_spinlock_unlock(&xl_buffer_lock);
 
1949
                return xlog_read_from_cache(seq, log_id, log_offset, size, 0, buffer, data_read, thread);
 
1950
        }
 
1951
 
 
1952
        if (data_read)
 
1953
                *data_read = 0;
 
1954
 
 
1955
        unlock_and_return:
 
1956
        xt_spinlock_unlock(&xl_buffer_lock);
 
1957
        return OK;
 
1958
}
 
1959
 
 
1960
xtBool XTDatabaseLog::xlog_write_thru(XTXactSeqReadPtr seq, size_t size, xtWord1 *data, XTThreadPtr thread)
 
1961
{
 
1962
        if (!xt_xlog_write(seq->xseq_log_file, seq->xseq_log_id, seq->xseq_rec_log_offset, size, data, thread))
 
1963
                return FALSE;
 
1964
        xl_log_bytes_written += size;
 
1965
        seq->xseq_rec_log_offset += size;
 
1966
        return TRUE;
 
1967
}
 
1968
 
 
1969
xtBool XTDatabaseLog::xlog_verify(XTXactLogBufferDPtr record, size_t rec_size, xtLogID log_id)
 
1970
{
 
1971
        xtWord4         sum = 0;
 
1972
        xtOpSeqNo       op_seq;
 
1973
        xtTableID       tab_id;
 
1974
        xtRecordID      rec_id, free_rec_id;
 
1975
        int                     check_size = 1;
 
1976
        xtWord1         *dptr;
 
1977
        xtWord4         g;
 
1978
 
 
1979
        switch (record->xh.xh_status_1) {
 
1980
                case XT_LOG_ENT_HEADER:
 
1981
                        if (record->xh.xh_checksum_1 != XT_CHECKSUM_1(log_id))
 
1982
                                return FALSE;
 
1983
                        if (XT_LOG_HEAD_MAGIC(record, rec_size) != XT_LOG_FILE_MAGIC)
 
1984
                                return FALSE;
 
1985
                        if (rec_size >= offsetof(XTXactLogHeaderDRec, xh_log_id_4) + 4) {
 
1986
                                if (XT_GET_DISK_4(record->xh.xh_log_id_4) != log_id)
 
1987
                                        return FALSE;
 
1988
                        }
 
1989
                        return TRUE;
 
1990
                case XT_LOG_ENT_NEW_LOG:
 
1991
                case XT_LOG_ENT_DEL_LOG:
 
1992
                        return record->xl.xl_checksum_1 == (XT_CHECKSUM_1(XT_GET_DISK_4(record->xl.xl_log_id_4)) ^ XT_CHECKSUM_1(log_id));
 
1993
                case XT_LOG_ENT_NEW_TAB:
 
1994
                        return record->xl.xl_checksum_1 == (XT_CHECKSUM_1(XT_GET_DISK_4(record->xt.xt_tab_id_4)) ^ XT_CHECKSUM_1(log_id));
 
1995
                case XT_LOG_ENT_COMMIT:
 
1996
                case XT_LOG_ENT_ABORT:
 
1997
                        sum = XT_CHECKSUM4_XACT(XT_GET_DISK_4(record->xe.xe_xact_id_4)) ^ XT_CHECKSUM4_XACT(XT_GET_DISK_4(record->xe.xe_not_used_4));
 
1998
                        return record->xe.xe_checksum_1 == (XT_CHECKSUM_1(sum) ^ XT_CHECKSUM_1(log_id));
 
1999
                case XT_LOG_ENT_CLEANUP:
 
2000
                        sum = XT_CHECKSUM4_XACT(XT_GET_DISK_4(record->xc.xc_xact_id_4));
 
2001
                        return record->xc.xc_checksum_1 == (XT_CHECKSUM_1(sum) ^ XT_CHECKSUM_1(log_id));
 
2002
                case XT_LOG_ENT_REC_MODIFIED:
 
2003
                case XT_LOG_ENT_UPDATE:
 
2004
                case XT_LOG_ENT_INSERT:
 
2005
                case XT_LOG_ENT_DELETE:
 
2006
                case XT_LOG_ENT_UPDATE_BG:
 
2007
                case XT_LOG_ENT_INSERT_BG:
 
2008
                case XT_LOG_ENT_DELETE_BG:
 
2009
                        check_size = 2;
 
2010
                        op_seq = XT_GET_DISK_4(record->xu.xu_op_seq_4);
 
2011
                        tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
 
2012
                        rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
 
2013
                        dptr = &record->xu.xu_rec_type_1;
 
2014
                        rec_size -= offsetof(XTactUpdateEntryDRec, xu_rec_type_1);
 
2015
                        break;
 
2016
                case XT_LOG_ENT_UPDATE_FL:
 
2017
                case XT_LOG_ENT_INSERT_FL:
 
2018
                case XT_LOG_ENT_DELETE_FL:
 
2019
                case XT_LOG_ENT_UPDATE_FL_BG:
 
2020
                case XT_LOG_ENT_INSERT_FL_BG:
 
2021
                case XT_LOG_ENT_DELETE_FL_BG:
 
2022
                        check_size = 2;
 
2023
                        op_seq = XT_GET_DISK_4(record->xf.xf_op_seq_4);
 
2024
                        tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
 
2025
                        rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
 
2026
                        free_rec_id = XT_GET_DISK_4(record->xf.xf_free_rec_id_4);
 
2027
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
 
2028
                        dptr = &record->xf.xf_rec_type_1;
 
2029
                        rec_size -= offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1);
 
2030
                        break;
 
2031
                case XT_DEFUNKT_REC_FREED:
 
2032
                case XT_DEFUNKT_REC_REMOVED:
 
2033
                case XT_DEFUNKT_REC_REMOVED_EXT:
 
2034
                        op_seq = XT_GET_DISK_4(record->fr.fr_op_seq_4);
 
2035
                        tab_id = XT_GET_DISK_4(record->fr.fr_tab_id_4);
 
2036
                        rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
 
2037
                        dptr = &record->fr.fr_stat_id_1;
 
2038
                        rec_size -= offsetof(XTactFreeRecEntryDRec, fr_stat_id_1);
 
2039
                        break;
 
2040
                case XT_LOG_ENT_REC_REMOVED_BI:
 
2041
                        check_size = 2;
 
2042
                        op_seq = XT_GET_DISK_4(record->rb.rb_op_seq_4);
 
2043
                        tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
 
2044
                        rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
 
2045
                        free_rec_id = (xtWord4) record->rb.rb_new_rec_type_1;
 
2046
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
 
2047
                        dptr = &record->rb.rb_rec_type_1;
 
2048
                        rec_size -= offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1);
 
2049
                        break;
 
2050
                case XT_LOG_ENT_REC_REMOVED_BI_L:
 
2051
                        check_size = 2;
 
2052
                        op_seq = XT_GET_DISK_4(record->bl.bl_op_seq_4);
 
2053
                        tab_id = XT_GET_DISK_4(record->bl.bl_tab_id_4);
 
2054
                        rec_id = XT_GET_DISK_4(record->bl.bl_rec_id_4);
 
2055
                        free_rec_id = XT_GET_DISK_4(record->bl.bl_prev_rec_id_4);
 
2056
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
 
2057
                        free_rec_id = (xtWord4) record->bl.bl_new_rec_type_1;
 
2058
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
 
2059
                        dptr = &record->bl.bl_rec_type_1;
 
2060
                        rec_size -= offsetof(XTactRemoveBILEntryDRec, bl_rec_type_1);
 
2061
                        break;
 
2062
                case XT_LOG_ENT_REC_MOVED:
 
2063
                case XT_LOG_ENT_REC_CLEANED:
 
2064
                case XT_LOG_ENT_REC_CLEANED_1:
 
2065
                case XT_LOG_ENT_REC_UNLINKED:
 
2066
                        op_seq = XT_GET_DISK_4(record->xw.xw_op_seq_4);
 
2067
                        tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
 
2068
                        rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
 
2069
                        dptr = &record->xw.xw_rec_type_1;
 
2070
                        rec_size -= offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
 
2071
                        break;
 
2072
                case XT_LOG_ENT_ROW_NEW:
 
2073
                case XT_LOG_ENT_ROW_NEW_FL:
 
2074
                        op_seq = XT_GET_DISK_4(record->xa.xa_op_seq_4);
 
2075
                        tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
 
2076
                        rec_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
 
2077
                        if (record->xh.xh_status_1 == XT_LOG_ENT_ROW_NEW) {
 
2078
                                dptr = (xtWord1 *) record + offsetof(XTactRowAddedEntryDRec, xa_free_list_4);
 
2079
                                rec_size -= offsetof(XTactRowAddedEntryDRec, xa_free_list_4);
 
2080
                        }
 
2081
                        else {
 
2082
                                free_rec_id = XT_GET_DISK_4(record->xa.xa_free_list_4);
 
2083
                                sum ^= XT_CHECKSUM4_REC(free_rec_id);
 
2084
                                dptr = (xtWord1 *) record + sizeof(XTactRowAddedEntryDRec);
 
2085
                                rec_size -= sizeof(XTactRowAddedEntryDRec);
 
2086
                        }
 
2087
                        break;
 
2088
                case XT_LOG_ENT_ROW_ADD_REC:
 
2089
                case XT_LOG_ENT_ROW_SET:
 
2090
                case XT_LOG_ENT_ROW_FREED:
 
2091
                        op_seq = XT_GET_DISK_4(record->wr.wr_op_seq_4);
 
2092
                        tab_id = XT_GET_DISK_4(record->wr.wr_tab_id_4);
 
2093
                        rec_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
 
2094
                        dptr = (xtWord1 *) &record->wr.wr_ref_id_4;
 
2095
                        rec_size -= offsetof(XTactWriteRowEntryDRec, wr_ref_id_4);
 
2096
                        break;
 
2097
                case XT_LOG_ENT_OP_SYNC:
 
2098
                        return record->xl.xl_checksum_1 == (XT_CHECKSUM_1(XT_GET_DISK_4(record->os.os_time_4)) ^ XT_CHECKSUM_1(log_id));
 
2099
                case XT_LOG_ENT_NO_OP:
 
2100
                        sum = XT_GET_DISK_4(record->no.no_tab_id_4) ^ XT_GET_DISK_4(record->no.no_op_seq_4);
 
2101
                        return record->xe.xe_checksum_1 == (XT_CHECKSUM_1(sum) ^ XT_CHECKSUM_1(log_id));
 
2102
                case XT_LOG_ENT_END_OF_LOG:
 
2103
                        return FALSE;
 
2104
                case XT_LOG_ENT_PREPARE:
 
2105
                        check_size = 2;
 
2106
                        op_seq = XT_GET_DISK_4(record->xp.xp_xact_id_4);
 
2107
                        tab_id = 0;
 
2108
                        rec_id = 0;
 
2109
                        dptr = record->xp.xp_xa_data;
 
2110
                        rec_size -= offsetof(XTXactPrepareEntryDRec, xp_xa_data);
 
2111
                        break;
 
2112
                default:
 
2113
                        ASSERT_NS(FALSE);
 
2114
                        return FALSE;
 
2115
        }
 
2116
 
 
2117
        sum ^= (xtWord4) op_seq ^ ((xtWord4) tab_id << 8) ^ XT_CHECKSUM4_REC(rec_id);
 
2118
 
 
2119
        if ((g = sum & 0xF0000000)) {
 
2120
                sum = sum ^ (g >> 24);
 
2121
                sum = sum ^ g;
 
2122
        }
 
2123
        for (u_int i=0; i<(u_int) rec_size; i++) {
 
2124
                sum = (sum << 4) + *dptr;
 
2125
                if ((g = sum & 0xF0000000)) {
 
2126
                        sum = sum ^ (g >> 24);
 
2127
                        sum = sum ^ g;
 
2128
                }
 
2129
                dptr++;
 
2130
        }
 
2131
 
 
2132
        if (check_size == 1) {
 
2133
                if (record->xh.xh_checksum_1 != (XT_CHECKSUM_1(sum) ^ XT_CHECKSUM_1(log_id))) {
 
2134
                        return FAILED;
 
2135
                }
 
2136
        }
 
2137
        else {
 
2138
                if (XT_GET_DISK_2(record->xu.xu_checksum_2) != (XT_CHECKSUM_2(sum) ^ XT_CHECKSUM_2(log_id))) {
 
2139
                        return FAILED;
 
2140
                }
 
2141
        }
 
2142
        return TRUE;
 
2143
}
 
2144
 
 
2145
xtBool XTDatabaseLog::xlog_seq_next(XTXactSeqReadPtr seq, XTXactLogBufferDPtr *ret_entry, xtBool verify, XTThreadPtr thread)
 
2146
{
 
2147
        XTXactLogBufferDPtr     record;
 
2148
        size_t                          tfer;
 
2149
        size_t                          len;
 
2150
        size_t                          rec_offset;
 
2151
        size_t                          max_rec_len;
 
2152
        size_t                          size;
 
2153
        u_int                           check_size = 1;
 
2154
 
 
2155
        /* Go to the next record (xseq_record_len must be initialized
 
2156
         * to 0 for this to work.
 
2157
         */
 
2158
        seq->xseq_rec_log_offset += seq->xseq_record_len;
 
2159
        seq->xseq_record_len = 0;
 
2160
 
 
2161
        if (seq->xseq_rec_log_offset < seq->xseq_buf_log_offset ||
 
2162
                seq->xseq_rec_log_offset >= seq->xseq_buf_log_offset + (xtLogOffset) seq->xseq_buffer_len) {
 
2163
                /* The current position is nowhere near the buffer, read data into the
 
2164
                 * buffer:
 
2165
                 */
 
2166
                tfer = seq->xseq_buffer_size;
 
2167
                if (!xlog_rnd_read(seq, seq->xseq_rec_log_id, seq->xseq_rec_log_offset, tfer, seq->xseq_buffer, &tfer, thread))
 
2168
                        return FAILED;
 
2169
                seq->xseq_buf_log_offset = seq->xseq_rec_log_offset;
 
2170
                seq->xseq_buffer_len = tfer;
 
2171
 
 
2172
                /* Should we go to the next log? */
 
2173
                if (!tfer) {
 
2174
                        goto return_empty;
 
2175
                }
 
2176
        }
 
2177
 
 
2178
        /* The start of the record is in the buffer: */
 
2179
        read_from_buffer:
 
2180
        rec_offset = (size_t) (seq->xseq_rec_log_offset - seq->xseq_buf_log_offset);
 
2181
        max_rec_len = seq->xseq_buffer_len - rec_offset;
 
2182
        size = 0;
 
2183
 
 
2184
        /* Check the type of record: */
 
2185
        record = (XTXactLogBufferDPtr) (seq->xseq_buffer + rec_offset);
 
2186
        switch (record->xh.xh_status_1) {
 
2187
                case XT_LOG_ENT_HEADER:
 
2188
                        len = sizeof(XTXactLogHeaderDRec);
 
2189
                        break;
 
2190
                case XT_LOG_ENT_NEW_LOG:
 
2191
                case XT_LOG_ENT_DEL_LOG:
 
2192
                        len = sizeof(XTXactNewLogEntryDRec);
 
2193
                        break;
 
2194
                case XT_LOG_ENT_NEW_TAB:
 
2195
                        len = sizeof(XTXactNewTabEntryDRec);
 
2196
                        break;
 
2197
                case XT_LOG_ENT_COMMIT:
 
2198
                case XT_LOG_ENT_ABORT:
 
2199
                        len = sizeof(XTXactEndEntryDRec);
 
2200
                        break;
 
2201
                case XT_LOG_ENT_CLEANUP:
 
2202
                        len = sizeof(XTXactCleanupEntryDRec);
 
2203
                        break;
 
2204
                case XT_LOG_ENT_REC_MODIFIED:
 
2205
                case XT_LOG_ENT_UPDATE:
 
2206
                case XT_LOG_ENT_INSERT:
 
2207
                case XT_LOG_ENT_DELETE:
 
2208
                case XT_LOG_ENT_UPDATE_BG:
 
2209
                case XT_LOG_ENT_INSERT_BG:
 
2210
                case XT_LOG_ENT_DELETE_BG:
 
2211
                        check_size = 2;
 
2212
                        len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1);
 
2213
                        if (len > max_rec_len)
 
2214
                                /* The size is not in the buffer: */
 
2215
                                goto read_more;
 
2216
                        len += (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
 
2217
                        break;
 
2218
                case XT_LOG_ENT_UPDATE_FL:
 
2219
                case XT_LOG_ENT_INSERT_FL:
 
2220
                case XT_LOG_ENT_DELETE_FL:
 
2221
                case XT_LOG_ENT_UPDATE_FL_BG:
 
2222
                case XT_LOG_ENT_INSERT_FL_BG:
 
2223
                case XT_LOG_ENT_DELETE_FL_BG:
 
2224
                        check_size = 2;
 
2225
                        len = offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1);
 
2226
                        if (len > max_rec_len)
 
2227
                                /* The size is not in the buffer: */
 
2228
                                goto read_more;
 
2229
                        len += (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
 
2230
                        break;
 
2231
                case XT_DEFUNKT_REC_FREED:
 
2232
                case XT_DEFUNKT_REC_REMOVED:
 
2233
                case XT_DEFUNKT_REC_REMOVED_EXT:
 
2234
                        /* [(7)] REMOVE is now a extended version of FREE! */
 
2235
                        len = offsetof(XTactFreeRecEntryDRec, fr_rec_type_1) + sizeof(XTTabRecFreeDRec);
 
2236
                        break;
 
2237
                case XT_LOG_ENT_REC_REMOVED_BI:
 
2238
                        check_size = 2;
 
2239
                        len = offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1);
 
2240
                        if (len > max_rec_len)
 
2241
                                /* The size is not in the buffer: */
 
2242
                                goto read_more;
 
2243
                        len += (size_t) XT_GET_DISK_2(record->rb.rb_size_2);
 
2244
                        break;
 
2245
                case XT_LOG_ENT_REC_REMOVED_BI_L:
 
2246
                        check_size = 2;
 
2247
                        len = offsetof(XTactRemoveBILEntryDRec, bl_rec_type_1);
 
2248
                        if (len > max_rec_len)
 
2249
                                /* The size is not in the buffer: */
 
2250
                                goto read_more;
 
2251
                        len += (size_t) XT_GET_DISK_2(record->bl.bl_size_2);
 
2252
                        break;
 
2253
                case XT_LOG_ENT_REC_MOVED:
 
2254
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 8;
 
2255
                        break;
 
2256
                case XT_LOG_ENT_REC_CLEANED:
 
2257
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
 
2258
                        break;
 
2259
                case XT_LOG_ENT_REC_CLEANED_1:
 
2260
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 1;
 
2261
                        break;
 
2262
                case XT_LOG_ENT_REC_UNLINKED:
 
2263
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
 
2264
                        break;
 
2265
                case XT_LOG_ENT_ROW_NEW:
 
2266
                        len = offsetof(XTactRowAddedEntryDRec, xa_row_id_4) + XT_ROW_ID_SIZE;
 
2267
                        break;
 
2268
                case XT_LOG_ENT_ROW_NEW_FL:
 
2269
                        len = offsetof(XTactRowAddedEntryDRec, xa_free_list_4) + XT_ROW_ID_SIZE;
 
2270
                        break;
 
2271
                case XT_LOG_ENT_ROW_ADD_REC:
 
2272
                case XT_LOG_ENT_ROW_SET:
 
2273
                case XT_LOG_ENT_ROW_FREED:
 
2274
                        len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4) + XT_REF_ID_SIZE;
 
2275
                        break;
 
2276
                case XT_LOG_ENT_OP_SYNC:
 
2277
                        len = sizeof(XTactOpSyncEntryDRec);
 
2278
                        break;
 
2279
                case XT_LOG_ENT_NO_OP:
 
2280
                        len = sizeof(XTactNoOpEntryDRec);
 
2281
                        break;
 
2282
                case XT_LOG_ENT_END_OF_LOG: {
 
2283
                        off_t eof = seq->xseq_log_eof, adjust;
 
2284
                        
 
2285
                        if (eof > seq->xseq_rec_log_offset) {
 
2286
                                adjust = eof - seq->xseq_rec_log_offset;
 
2287
 
 
2288
                                seq->xseq_record_len = (size_t) adjust;
 
2289
                        }
 
2290
                        goto return_empty;
 
2291
                }
 
2292
                case XT_LOG_ENT_PREPARE:
 
2293
                        check_size = 2;
 
2294
                        len = offsetof(XTXactPrepareEntryDRec, xp_xa_data);
 
2295
                        if (len > max_rec_len)
 
2296
                                /* The size is not in the buffer: */
 
2297
                                goto read_more;
 
2298
                        len += (size_t) record->xp.xp_xa_len_1;
 
2299
                        break;
 
2300
                default:
 
2301
                        /* It is possible to land here after a crash, if the
 
2302
                         * log was not completely written.
 
2303
                         */
 
2304
                        seq->xseq_record_len = 0;
 
2305
                        goto return_empty;
 
2306
        }
 
2307
 
 
2308
        ASSERT_NS(len <= seq->xseq_buffer_size);
 
2309
        if (len <= max_rec_len) {
 
2310
                if (verify) {
 
2311
                        if (!xlog_verify(record, len, seq->xseq_rec_log_id)) {
 
2312
                                goto return_empty;
 
2313
                        }
 
2314
                }
 
2315
 
 
2316
                /* The record is completely in the buffer: */
 
2317
                seq->xseq_record_len = len;
 
2318
                *ret_entry = record;
 
2319
                return OK;
 
2320
        }
 
2321
        
 
2322
        /* The record is partially in the buffer. */
 
2323
        memmove(seq->xseq_buffer, seq->xseq_buffer + rec_offset, max_rec_len);
 
2324
        seq->xseq_buf_log_offset += rec_offset;
 
2325
        seq->xseq_buffer_len = max_rec_len;
 
2326
 
 
2327
        /* Read the rest, as far as possible: */
 
2328
        tfer = seq->xseq_buffer_size - max_rec_len;
 
2329
        if (!xlog_rnd_read(seq, seq->xseq_rec_log_id, seq->xseq_buf_log_offset + max_rec_len, tfer, seq->xseq_buffer + max_rec_len, &tfer, thread))
 
2330
                return FAILED;
 
2331
        seq->xseq_buffer_len += tfer;
 
2332
 
 
2333
        if (seq->xseq_buffer_len < len) {
 
2334
                /* A partial record is in the log, must be the end of the log: */
 
2335
                goto return_empty;
 
2336
        }
 
2337
 
 
2338
        /* The record is now completely in the buffer: */
 
2339
        seq->xseq_record_len = len;
 
2340
        *ret_entry = (XTXactLogBufferDPtr) seq->xseq_buffer;
 
2341
        return OK;
 
2342
 
 
2343
        read_more:
 
2344
        ASSERT_NS(len <= seq->xseq_buffer_size);
 
2345
        memmove(seq->xseq_buffer, seq->xseq_buffer + rec_offset, max_rec_len);
 
2346
        seq->xseq_buf_log_offset += rec_offset;
 
2347
        seq->xseq_buffer_len = max_rec_len;
 
2348
 
 
2349
        /* Read the rest, as far as possible: */
 
2350
        tfer = seq->xseq_buffer_size - max_rec_len;
 
2351
        if (!xlog_rnd_read(seq, seq->xseq_rec_log_id, seq->xseq_buf_log_offset + max_rec_len, tfer, seq->xseq_buffer + max_rec_len, &tfer, thread))
 
2352
                return FAILED;
 
2353
        seq->xseq_buffer_len += tfer;
 
2354
 
 
2355
        if (seq->xseq_buffer_len < len + size) {
 
2356
                /* We did not get as much as we need, return an empty record: */
 
2357
                goto return_empty;
 
2358
        }
 
2359
 
 
2360
        goto read_from_buffer;
 
2361
 
 
2362
        return_empty:
 
2363
        *ret_entry = NULL;
 
2364
        return OK;
 
2365
}
 
2366
 
 
2367
void XTDatabaseLog::xlog_seq_skip(XTXactSeqReadPtr seq, size_t size)
 
2368
{
 
2369
        seq->xseq_record_len += size;
 
2370
}
 
2371
 
 
2372
/* ----------------------------------------------------------------------
 
2373
 * W R I T E R    P R O C E S S
 
2374
 */
 
2375
 
 
2376
/*
 
2377
 * The log has been written. Wake the writer to commit the
 
2378
 * data to disk, if the transaction log cache is full.
 
2379
 *
 
2380
 * Data may not be written to the database until it has been
 
2381
 * flushed to the log.
 
2382
 *
 
2383
 * This is because there is no way to undo changes to the
 
2384
 * database.
 
2385
 *
 
2386
 * However, I have dicovered that writing constantly in the
 
2387
 * background can disturb the I/O in the foreground.
 
2388
 *
 
2389
 * So we can delay the writing of the database. But we should
 
2390
 * not delay it longer than we have transaction log cache.
 
2391
 *
 
2392
 * If so, the data that we need will fall out of the cache
 
2393
 * and we will have to read it again.
 
2394
 */
 
2395
static void xlog_wr_log_written(XTDatabaseHPtr db)
 
2396
{
 
2397
        if (db->db_wr_idle) {
 
2398
                xtWord8 cached_bytes;
 
2399
 
 
2400
                /* Determine if the cached log data is about to fall out of the cache. */
 
2401
                cached_bytes = db->db_xlog.xl_log_bytes_written - db->db_xlog.xl_log_bytes_read;
 
2402
                /* The limit is 75%: */
 
2403
                if (cached_bytes >= xt_xlog_cache.xlc_upper_limit) {
 
2404
                        if (!xt_broadcast_cond_ns(&db->db_wr_cond))
 
2405
                                xt_log_and_clear_exception_ns();
 
2406
                }
 
2407
        }
 
2408
}
 
2409
 
 
2410
#define XT_MORE_TO_WRITE                1
 
2411
#define XT_FREER_WAITING                2
 
2412
#define XT_NO_ACTIVITY                  3
 
2413
#define XT_LOG_CACHE_FULL               4
 
2414
#define XT_CHECKPOINT_REQ               5
 
2415
#define XT_THREAD_WAITING               6
 
2416
#define XT_TIME_TO_WRITE                7
 
2417
 
 
2418
/*
 
2419
 * Wait for a reason to write the data from the log to the database.
 
2420
 * This can be one of the following:
 
2421
 *
 
2422
 */
 
2423
static int xlog_wr_wait_for_write_condition(XTThreadPtr self, XTDatabaseHPtr db, int old_reason)
 
2424
{
 
2425
        xtXactID        last_xn_id;
 
2426
        xtWord8         cached_bytes;
 
2427
        int                     reason = XT_MORE_TO_WRITE;
 
2428
 
 
2429
#ifdef TRACE_WRITER_ACTIVITY
 
2430
        printf("WRITER --- DONE\n");
 
2431
#endif
 
2432
 
 
2433
        xt_lock_mutex(self, &db->db_wr_lock);
 
2434
        pushr_(xt_unlock_mutex, &db->db_wr_lock);
 
2435
 
 
2436
        /*
 
2437
         * Wake the freeer if it is waiting for this writer, before
 
2438
         * we go to sleep!
 
2439
         */
 
2440
        if (db->db_wr_freeer_waiting) {
 
2441
                if (!xt_broadcast_cond_ns(&db->db_wr_cond))
 
2442
                        xt_log_and_clear_exception_ns();
 
2443
        }
 
2444
 
 
2445
        if (db->db_wr_flush_point_log_id == db->db_xlog.xl_flush_log_id &&
 
2446
                db->db_wr_flush_point_log_offset == db->db_xlog.xl_flush_log_offset) {
 
2447
                /* Wake the checkpointer to flush the indexes:
 
2448
                 * PMC 15.05.2008 - Not doing this anymore!
 
2449
                 *
 
2450
                 * PMC 1.07.2009 - Added this notification again, if the
 
2451
                 * reason why we wrote is because the checkpointer may have
 
2452
                 * been waiting:
 
2453
                 */
 
2454
                if (old_reason == XT_CHECKPOINT_REQ)
 
2455
                        xt_wake_checkpointer(self, db);
 
2456
 
 
2457
                /* Sleep as long as the flush point has not changed, from the last
 
2458
                 * target flush point.
 
2459
                 */
 
2460
                while (!self->t_quit &&
 
2461
                        db->db_wr_flush_point_log_id == db->db_xlog.xl_flush_log_id &&
 
2462
                        db->db_wr_flush_point_log_offset == db->db_xlog.xl_flush_log_offset &&
 
2463
                        reason != XT_LOG_CACHE_FULL &&
 
2464
                        reason != XT_TIME_TO_WRITE &&
 
2465
                        reason != XT_CHECKPOINT_REQ) {
 
2466
 
 
2467
                        /*
 
2468
                         * Sleep as long as there is no reason to write any more...
 
2469
                         */
 
2470
                        while (!self->t_quit) {
 
2471
                                last_xn_id = db->db_xn_curr_id;
 
2472
                                db->db_wr_idle = XT_THREAD_IDLE;
 
2473
                                xt_timed_wait_cond(self, &db->db_wr_cond, &db->db_wr_lock, 500);
 
2474
                                db->db_wr_idle = XT_THREAD_BUSY;
 
2475
                                /* These are the reasons for doing work: */
 
2476
                                /* The free'er thread is waiting for the writer: */
 
2477
                                if (db->db_wr_freeer_waiting) {
 
2478
                                        reason = XT_FREER_WAITING;
 
2479
                                        break;
 
2480
                                }
 
2481
                                /* Some thread is waiting for the writer: */
 
2482
                                if (db->db_wr_thread_waiting) {
 
2483
                                        reason = XT_THREAD_WAITING;
 
2484
                                        break;
 
2485
                                }
 
2486
                                /* Check if the cache will soon overflow... */
 
2487
                                ASSERT(db->db_xlog.xl_log_bytes_written >= db->db_xlog.xl_log_bytes_read);
 
2488
                                ASSERT(db->db_xlog.xl_log_bytes_written >= db->db_xlog.xl_log_bytes_flushed);
 
2489
                                /* Sanity check: */
 
2490
                                ASSERT(db->db_xlog.xl_log_bytes_written < db->db_xlog.xl_log_bytes_read + 500000000);
 
2491
                                /* This is the amount of data still to be written: */
 
2492
                                cached_bytes = db->db_xlog.xl_log_bytes_written - db->db_xlog.xl_log_bytes_read;
 
2493
                                /* The limit is 75%: */
 
2494
                                if (cached_bytes >= xt_xlog_cache.xlc_upper_limit) {
 
2495
                                        reason = XT_LOG_CACHE_FULL;
 
2496
                                        break;
 
2497
                                }
 
2498
                                
 
2499
                                /* Create a system variable which specifies the write frequency. */
 
2500
                                if (xt_db_record_write_threshold && cached_bytes >= xt_db_record_write_threshold) {
 
2501
                                        reason = XT_TIME_TO_WRITE;
 
2502
                                        break;
 
2503
                                }
 
2504
                                
 
2505
                                /* Check if we are holding up a checkpoint: */
 
2506
                                if (db->db_restart.xres_cp_required ||
 
2507
                                        db->db_restart.xres_is_checkpoint_pending(db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset)) {
 
2508
                                        /* Enough data has been written for a checkpoint: */
 
2509
                                        if (!db->db_restart.xres_is_checkpoint_pending(db->db_wr_log_id, db->db_wr_log_offset)) {
 
2510
                                                /* But not enough data has been written for a checkpoint: */
 
2511
                                                reason = XT_CHECKPOINT_REQ;
 
2512
                                                break;
 
2513
                                        }
 
2514
                                }
 
2515
                                /* There is no activity, if the current ID has not changed during
 
2516
                                 * the wait, and the sweeper has nothing to do, and the checkpointer.
 
2517
                                 */
 
2518
                                if (db->db_xn_curr_id == last_xn_id &&
 
2519
                                        /* Changed xt_xn_get_curr_id(db) to db->db_xn_curr_id,
 
2520
                                         * This should work because we are not concerned about the difference
 
2521
                                         * between xt_xn_get_curr_id(db) and db->db_xn_curr_id,
 
2522
                                         * Which is just a matter of when transactions we can expect ot find
 
2523
                                         * in memory (see {GAP-INC-ADD-XACT})
 
2524
                                         */
 
2525
                                        xt_xn_is_before(db->db_xn_curr_id, db->db_xn_to_clean_id) && // db->db_xn_curr_id < db->db_xn_to_clean_id
 
2526
                                        !db->db_restart.xres_is_checkpoint_pending(db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset)) {
 
2527
                                        /* There seems to be no activity at the moment.
 
2528
                                         * this might be a good time to write the log data.
 
2529
                                         */
 
2530
                                        reason = XT_NO_ACTIVITY;
 
2531
                                        break;
 
2532
                                }
 
2533
                        }
 
2534
                }
 
2535
        }
 
2536
        freer_(); // xt_unlock_mutex(&db->db_wr_lock)
 
2537
 
 
2538
        if (reason == XT_LOG_CACHE_FULL || reason == XT_TIME_TO_WRITE || reason == XT_CHECKPOINT_REQ) {
 
2539
                /* Make sure that we have something to write: */
 
2540
                if (db->db_xlog.xlog_bytes_to_write() < 2 * 1204 * 1024)
 
2541
                        xt_xlog_flush_log(db, self);
 
2542
        }
 
2543
 
 
2544
#ifdef TRACE_WRITER_ACTIVITY
 
2545
        switch (reason) {
 
2546
                case XT_MORE_TO_WRITE:  printf("WRITER --- still more to write...\n"); break;
 
2547
                case XT_FREER_WAITING:  printf("WRITER --- free'er waiting for writer...\n"); break;
 
2548
                case XT_NO_ACTIVITY:    printf("WRITER --- no activity...\n"); break;
 
2549
                case XT_LOG_CACHE_FULL: printf("WRITER --- running out of log cache...\n"); break;
 
2550
                case XT_CHECKPOINT_REQ: printf("WRITER --- enough flushed for a checkpoint...\n"); break;
 
2551
                case XT_THREAD_WAITING: printf("WRITER --- thread waiting for writer...\n"); break;
 
2552
                case XT_TIME_TO_WRITE:  printf("WRITER --- limit of 12MB reached, time to write...\n"); break;
 
2553
        }
 
2554
#endif
 
2555
        
 
2556
        return reason;
 
2557
}
 
2558
 
 
2559
static void xlog_wr_could_go_faster(XTThreadPtr self, XTDatabaseHPtr db)
 
2560
{
 
2561
        if (db->db_wr_faster) {
 
2562
                if (!db->db_wr_fast) {
 
2563
                        xt_set_normal_priority(self);
 
2564
                        db->db_wr_fast = TRUE;
 
2565
                }
 
2566
                db->db_wr_faster = FALSE;
 
2567
        }
 
2568
}
 
2569
 
 
2570
static void xlog_wr_could_go_slower(XTThreadPtr self, XTDatabaseHPtr db)
 
2571
{
 
2572
        if (db->db_wr_fast && !db->db_wr_faster) {
 
2573
                xt_set_low_priority(self);
 
2574
                db->db_wr_fast = FALSE;
 
2575
        }
 
2576
}
 
2577
 
 
2578
static void xlog_wr_main(XTThreadPtr self)
 
2579
{
 
2580
        XTDatabaseHPtr          db = self->st_database;
 
2581
        XTWriterStatePtr        ws;
 
2582
        XTXactLogBufferDPtr     record;
 
2583
        int                                     reason = XT_NO_ACTIVITY;
 
2584
 
 
2585
        xt_set_low_priority(self);
 
2586
 
 
2587
        alloczr_(ws, xt_free_writer_state, sizeof(XTWriterStateRec), XTWriterStatePtr);
 
2588
        ws->ws_db = db;
 
2589
        ws->ws_in_recover = FALSE;
 
2590
 
 
2591
        if (!db->db_xlog.xlog_seq_init(&ws->ws_seqread, xt_db_log_buffer_size, FALSE))
 
2592
                xt_throw(self);
 
2593
 
 
2594
        if (!db->db_xlog.xlog_seq_start(&ws->ws_seqread, db->db_wr_log_id, db->db_wr_log_offset, FALSE))
 
2595
                xt_throw(self);
 
2596
 
 
2597
        while (!self->t_quit) {
 
2598
                while (!self->t_quit) {
 
2599
                        /* Determine the point to which we can write.
 
2600
                         * This is the current log flush point!
 
2601
                         */
 
2602
                        xt_lock_mutex_ns(&db->db_wr_lock);
 
2603
                        db->db_wr_flush_point_log_id = db->db_xlog.xl_flush_log_id;
 
2604
                        db->db_wr_flush_point_log_offset = db->db_xlog.xl_flush_log_offset;
 
2605
                        xt_unlock_mutex_ns(&db->db_wr_lock);
 
2606
 
 
2607
                        if (xt_comp_log_pos(db->db_wr_log_id, db->db_wr_log_offset, db->db_wr_flush_point_log_id, db->db_wr_flush_point_log_offset) >= 0) {
 
2608
                                break;
 
2609
                        }
 
2610
 
 
2611
                        while (!self->t_quit) {
 
2612
                                xlog_wr_could_go_faster(self, db);
 
2613
 
 
2614
                                /* This is the restart position: */
 
2615
                                xt_lock_mutex(self, &db->db_wr_lock);
 
2616
                                pushr_(xt_unlock_mutex, &db->db_wr_lock);
 
2617
                                db->db_wr_log_id = ws->ws_seqread.xseq_rec_log_id;
 
2618
                                db->db_wr_log_offset = ws->ws_seqread.xseq_rec_log_offset +  ws->ws_seqread.xseq_record_len;
 
2619
                                freer_(); // xt_unlock_mutex(&db->db_wr_lock)
 
2620
 
 
2621
                                if (xt_comp_log_pos(db->db_wr_log_id, db->db_wr_log_offset, db->db_wr_flush_point_log_id, db->db_wr_flush_point_log_offset) >= 0) {
 
2622
                                        break;
 
2623
                                }
 
2624
 
 
2625
                                /* Apply all changes that have been flushed to the log, to the
 
2626
                                 * database.
 
2627
                                 */
 
2628
                                if (!db->db_xlog.xlog_seq_next(&ws->ws_seqread, &record, FALSE, self))
 
2629
                                        xt_throw(self);
 
2630
                                if (!record) {
 
2631
                                        break;
 
2632
                                }
 
2633
                                switch (record->xl.xl_status_1) {
 
2634
                                        case XT_LOG_ENT_HEADER:
 
2635
                                                break;
 
2636
                                        case XT_LOG_ENT_NEW_LOG:
 
2637
                                                if (!db->db_xlog.xlog_seq_start(&ws->ws_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, TRUE))
 
2638
                                                        xt_throw(self);
 
2639
                                                break;
 
2640
                                        case XT_LOG_ENT_NEW_TAB:
 
2641
                                        case XT_LOG_ENT_COMMIT:
 
2642
                                        case XT_LOG_ENT_ABORT:
 
2643
                                        case XT_LOG_ENT_CLEANUP:
 
2644
                                        case XT_LOG_ENT_OP_SYNC:
 
2645
                                        case XT_LOG_ENT_PREPARE:
 
2646
                                                break;
 
2647
                                        case XT_LOG_ENT_DEL_LOG:
 
2648
                                                xtLogID log_id;
 
2649
 
 
2650
                                                log_id = XT_GET_DISK_4(record->xl.xl_log_id_4);
 
2651
                                                xt_dl_set_to_delete(self, db, log_id);
 
2652
                                                break;
 
2653
                                        default:
 
2654
                                                xt_xres_apply_in_order(self, ws, ws->ws_seqread.xseq_rec_log_id, ws->ws_seqread.xseq_rec_log_offset, record);
 
2655
                                                break;
 
2656
                                }
 
2657
                                /* Count the number of bytes read from the log: */
 
2658
                                db->db_xlog.xl_log_bytes_read += ws->ws_seqread.xseq_record_len;
 
2659
                        }
 
2660
                }
 
2661
 
 
2662
#ifdef XT_SORT_REC_WRITES
 
2663
                /* Flush because the freeer may be waiting! */
 
2664
                xt_xres_flush_all(self, ws);
 
2665
#endif
 
2666
 
 
2667
                if (ws->ws_ot) {
 
2668
                        xt_db_return_table_to_pool(self, ws->ws_ot);
 
2669
                        ws->ws_ot = NULL;
 
2670
                }
 
2671
 
 
2672
                xlog_wr_could_go_slower(self, db);
 
2673
 
 
2674
                /* Note, we delay writing the database for a maximum of
 
2675
                 * 2 seconds.
 
2676
                 */
 
2677
                reason = xlog_wr_wait_for_write_condition(self, db, reason);
 
2678
        }
 
2679
 
 
2680
        freer_(); // xt_free_writer_state(ss)
 
2681
}
 
2682
 
 
2683
static void *xlog_wr_run_thread(XTThreadPtr self)
 
2684
{
 
2685
        XTDatabaseHPtr  db = (XTDatabaseHPtr) self->t_data;
 
2686
        int                             count;
 
2687
        void                    *mysql_thread;
 
2688
 
 
2689
        mysql_thread = myxt_create_thread();
 
2690
 
 
2691
        while (!self->t_quit) {
 
2692
                try_(a) {
 
2693
                        /*
 
2694
                         * The garbage collector requires that the database
 
2695
                         * is in use because.
 
2696
                         */
 
2697
                        xt_use_database(self, db, XT_FOR_WRITER);
 
2698
 
 
2699
                        /* This action is both safe and required (see {BACKGROUND-RELEASE-DB}) */
 
2700
                        xt_heap_release(self, self->st_database);
 
2701
 
 
2702
                        xlog_wr_main(self);
 
2703
                }
 
2704
                catch_(a) {
 
2705
                        /* This error is "normal"! */
 
2706
                        if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
 
2707
                                !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
 
2708
                                self->t_exception.e_sys_err == SIGTERM))
 
2709
                                xt_log_and_clear_exception(self);
 
2710
                }
 
2711
                cont_(a);
 
2712
 
 
2713
                /* Avoid releasing the database (done above) */
 
2714
                self->st_database = NULL;
 
2715
                xt_unuse_database(self, self);
 
2716
 
 
2717
                /* After an exception, pause before trying again... */
 
2718
                /* Number of seconds */
 
2719
#ifdef DEBUG
 
2720
                count = 10;
 
2721
#else
 
2722
                count = 2*60;
 
2723
#endif
 
2724
                db->db_wr_idle = XT_THREAD_INERR;
 
2725
                while (!self->t_quit && count > 0) {
 
2726
                        sleep(1);
 
2727
                        count--;
 
2728
                }
 
2729
                db->db_wr_idle = XT_THREAD_BUSY;
 
2730
        }
 
2731
 
 
2732
   /*
 
2733
        * {MYSQL-THREAD-KILL}
 
2734
        myxt_destroy_thread(mysql_thread, TRUE);
 
2735
        */
 
2736
        return NULL;
 
2737
}
 
2738
 
 
2739
static void xlog_wr_free_thread(XTThreadPtr self, void *data)
 
2740
{
 
2741
        XTDatabaseHPtr db = (XTDatabaseHPtr) data;
 
2742
 
 
2743
        if (db->db_wr_thread) {
 
2744
                xt_lock_mutex(self, &db->db_wr_lock);
 
2745
                pushr_(xt_unlock_mutex, &db->db_wr_lock);
 
2746
                db->db_wr_thread = NULL;
 
2747
                freer_(); // xt_unlock_mutex(&db->db_wr_lock)
 
2748
        }
 
2749
}
 
2750
 
 
2751
xtPublic void xt_start_writer(XTThreadPtr self, XTDatabaseHPtr db)
 
2752
{
 
2753
        char name[PATH_MAX];
 
2754
 
 
2755
        sprintf(name, "WR-%s", xt_last_directory_of_path(db->db_main_path));
 
2756
        xt_remove_dir_char(name);
 
2757
        db->db_wr_thread = xt_create_daemon(self, name);
 
2758
        xt_set_thread_data(db->db_wr_thread, db, xlog_wr_free_thread);
 
2759
        xt_run_thread(self, db->db_wr_thread, xlog_wr_run_thread);
 
2760
}
 
2761
 
 
2762
/*
 
2763
 * This function is called on database shutdown.
 
2764
 * We will wait a certain amounnt of time for the writer to
 
2765
 * complete its work.
 
2766
 * If it takes to long we will abort!
 
2767
 */
 
2768
xtPublic void xt_wait_for_writer(XTThreadPtr self, XTDatabaseHPtr db)
 
2769
{
 
2770
        time_t  then, now;
 
2771
        xtBool  message = FALSE;
 
2772
 
 
2773
        if (db->db_wr_thread) {
 
2774
                then = time(NULL);
 
2775
                while (xt_comp_log_pos(db->db_wr_log_id, db->db_wr_log_offset, db->db_wr_flush_point_log_id, db->db_wr_flush_point_log_offset) < 0) {
 
2776
 
 
2777
                        xt_lock_mutex(self, &db->db_wr_lock);
 
2778
                        pushr_(xt_unlock_mutex, &db->db_wr_lock);
 
2779
                        db->db_wr_thread_waiting++;
 
2780
                        /* Wake the writer so that it con complete its work. */
 
2781
                        if (db->db_wr_idle) {
 
2782
                                if (!xt_broadcast_cond_ns(&db->db_wr_cond))
 
2783
                                        xt_log_and_clear_exception_ns();
 
2784
                        }
 
2785
                        freer_(); // xt_unlock_mutex(&db->db_wr_lock)
 
2786
 
 
2787
                        xt_sleep_milli_second(10);
 
2788
 
 
2789
                        xt_lock_mutex(self, &db->db_wr_lock);
 
2790
                        pushr_(xt_unlock_mutex, &db->db_wr_lock);
 
2791
                        db->db_wr_thread_waiting--;
 
2792
                        freer_(); // xt_unlock_mutex(&db->db_wr_lock)
 
2793
 
 
2794
                        now = time(NULL);
 
2795
                        if (now >= then + 16) {
 
2796
                                xt_logf(XT_NT_INFO, "Aborting wait for '%s' writer\n", db->db_name);
 
2797
                                message = FALSE;
 
2798
                                break;
 
2799
                        }
 
2800
                        if (now >= then + 2) {
 
2801
                                if (!message) {
 
2802
                                        message = TRUE;
 
2803
                                        xt_logf(XT_NT_INFO, "Waiting for '%s' writer...\n", db->db_name);
 
2804
                                }
 
2805
                        }
 
2806
                }
 
2807
                
 
2808
                if (message)
 
2809
                        xt_logf(XT_NT_INFO, "Writer '%s' done.\n", db->db_name);
 
2810
        }
 
2811
}
 
2812
 
 
2813
xtPublic void xt_stop_writer(XTThreadPtr self, XTDatabaseHPtr db)
 
2814
{
 
2815
        XTThreadPtr thr_wr;
 
2816
 
 
2817
        if (db->db_wr_thread) {
 
2818
                xt_lock_mutex(self, &db->db_wr_lock);
 
2819
                pushr_(xt_unlock_mutex, &db->db_wr_lock);
 
2820
 
 
2821
                /* This pointer is safe as long as you have the transaction lock. */
 
2822
                if ((thr_wr = db->db_wr_thread)) {
 
2823
                        xtThreadID tid = thr_wr->t_id;
 
2824
 
 
2825
                        /* Make sure the thread quits when woken up. */
 
2826
                        xt_terminate_thread(self, thr_wr);
 
2827
 
 
2828
                        /* Wake the writer thread so that it will quit: */
 
2829
                        xt_broadcast_cond(self, &db->db_wr_cond);
 
2830
        
 
2831
                        freer_(); // xt_unlock_mutex(&db->db_wr_lock)
 
2832
 
 
2833
                        /*
 
2834
                         * GOTCHA: This is a wierd thing but the SIGTERM directed
 
2835
                         * at a particular thread (in this case the sweeper) was
 
2836
                         * being caught by a different thread and killing the server
 
2837
                         * sometimes. Disconcerting.
 
2838
                         * (this may only be a problem on Mac OS X)
 
2839
                        xt_kill_thread(thread);
 
2840
                         */
 
2841
                        xt_wait_for_thread_to_exit(tid, FALSE);
 
2842
        
 
2843
                        /* PMC - This should not be necessary to set the signal here, but in the
 
2844
                         * debugger the handler is not called!!?
 
2845
                        thr_wr->t_delayed_signal = SIGTERM;
 
2846
                        xt_kill_thread(thread);
 
2847
                         */
 
2848
                        db->db_wr_thread = NULL;
 
2849
                }
 
2850
                else
 
2851
                        freer_(); // xt_unlock_mutex(&db->db_wr_lock)
 
2852
        }
 
2853
}
 
2854
 
 
2855
#ifdef NOT_USED
 
2856
static void xlog_add_to_flush_buffer(u_int flush_count, XTXLogBlockPtr *flush_buffer, XTXLogBlockPtr block)
 
2857
{
 
2858
        register u_int          count = flush_count;
 
2859
        register u_int          i;
 
2860
        register u_int          guess;
 
2861
        register xtInt8         r;
 
2862
 
 
2863
        i = 0;
 
2864
        while (i < count) {
 
2865
                guess = (i + count - 1) >> 1;
 
2866
                r = (xtInt8) block->xlb_address - (xtInt8) flush_buffer[guess]->xlb_address;
 
2867
                if (r == 0) {
 
2868
                        // Should not happen...
 
2869
                        ASSERT_NS(FALSE);
 
2870
                        return;
 
2871
                }
 
2872
                if (r < (xtInt8) 0)
 
2873
                        count = guess;
 
2874
                else
 
2875
                        i = guess + 1;
 
2876
        }
 
2877
 
 
2878
        /* Insert at position i */
 
2879
        memmove(flush_buffer + i + 1, flush_buffer + i, (flush_count - i) * sizeof(XTXLogBlockPtr));
 
2880
        flush_buffer[i] = block;
 
2881
}
 
2882
 
 
2883
static XTXLogBlockPtr xlog_find_block(XTOpenFilePtr file, xtLogID log_id, off_t address, XTXLogCacheSegPtr *ret_seg)
 
2884
{
 
2885
        register XTXLogCacheSegPtr      seg;
 
2886
        register XTXLogBlockPtr         block;
 
2887
        register u_int                          hash_idx;
 
2888
        register XTXLogCacheRec         *dcg = &xt_xlog_cache;
 
2889
 
 
2890
        seg = &dcg->xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK];
 
2891
        hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % dcg->xlc_hash_size;
 
2892
 
 
2893
        xt_lock_mutex_ns(&seg->lcs_lock);
 
2894
        retry:
 
2895
        block = seg->lcs_hash_table[hash_idx];
 
2896
        while (block) {
 
2897
                if (block->xlb_address == address && block->xlb_log_id == log_id) {
 
2898
                        ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
 
2899
 
 
2900
                        /* Wait if the block is being read or written.
 
2901
                         * If we will just read the data, then we don't care
 
2902
                         * if the buffer is being written.
 
2903
                         */
 
2904
                        if (block->xlb_state == XLC_BLOCK_READING) {
 
2905
                                if (!xt_timed_wait_cond_ns(&seg->lcs_cond, &seg->lcs_lock, 100))
 
2906
                                        break;
 
2907
                                goto retry;
 
2908
                        }
 
2909
 
 
2910
                        *ret_seg = seg;
 
2911
                        return block;
 
2912
                }
 
2913
                block = block->xlb_next;
 
2914
        }
 
2915
        
 
2916
        /* Block not found: */
 
2917
        xt_unlock_mutex_ns(&seg->lcs_lock);
 
2918
        return NULL;
 
2919
}
 
2920
 
 
2921
static int xlog_cmp_log_files(struct XTThread *self, register const void *thunk, register const void *a, register const void *b)
 
2922
{
 
2923
#pragma unused(self, thunk)
 
2924
        xtLogID                         lf_id = *((xtLogID *) a);
 
2925
        XTXactLogFilePtr        lf_ptr = (XTXactLogFilePtr) b;
 
2926
 
 
2927
        if (lf_id < lf_ptr->lf_log_id)
 
2928
                return -1;
 
2929
        if (lf_id == lf_ptr->lf_log_id)
 
2930
                return 0;
 
2931
        return 1;
 
2932
}
 
2933
 
 
2934
#endif
 
2935
 
 
2936
 
 
2937
#ifdef OLD_CODE
 
2938
static xtBool xlog_free_lru_blocks()
 
2939
{
 
2940
        XTXLogBlockPtr          block, pblock;
 
2941
        xtWord4                         ru_time;
 
2942
        xtLogID                         log_id;
 
2943
        off_t                           address;
 
2944
        //off_t                         hash;
 
2945
        XTXLogCacheSegPtr       seg;
 
2946
        u_int                           hash_idx;
 
2947
        xtBool                          have_global_lock = FALSE;
 
2948
 
 
2949
#ifdef DEBUG_CHECK_CACHE
 
2950
        //xt_xlog_check_cache();
 
2951
#endif
 
2952
        retry:
 
2953
        if (!(block = xt_xlog_cache.xlc_lru_block))
 
2954
                return OK;
 
2955
 
 
2956
        ru_time = block->xlb_ru_time;
 
2957
        log_id = block->xlb_log_id;
 
2958
        address = block->xlb_address;
 
2959
 
 
2960
        /*
 
2961
        hash = (address >> XT_XLC_BLOCK_SHIFTS) ^ ((off_t) log_id << 28);
 
2962
        seg = &xt_xlog_cache.xlc_segment[hash & XLC_SEGMENT_MASK];
 
2963
        hash_idx = (hash >> XT_XLC_SEGMENT_SHIFTS) % xt_xlog_cache.xlc_hash_size;
 
2964
        */
 
2965
        seg = &xt_xlog_cache.xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK];
 
2966
        hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % xt_xlog_cache.xlc_hash_size;
 
2967
 
 
2968
        xt_lock_mutex_ns(&seg->lcs_lock);
 
2969
 
 
2970
        free_more:
 
2971
        pblock = NULL;
 
2972
        block = seg->lcs_hash_table[hash_idx];
 
2973
        while (block) {
 
2974
                if (block->xlb_address == address && block->xlb_log_id == log_id) {
 
2975
                        ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
 
2976
                        
 
2977
                        /* Try again if the block has been used in the meantime: */
 
2978
                        if (ru_time != block->xlb_ru_time) {
 
2979
                                if (have_global_lock)
 
2980
                                        /* Having this lock means we have already freed at least one block so
 
2981
                                         * don't bother to free more if we are having trouble.
 
2982
                                         */
 
2983
                                        goto done_ok;
 
2984
 
 
2985
                                /* If the recently used time has changed, then the
 
2986
                                 * block is probably no longer the LR used.
 
2987
                                 */
 
2988
                                xt_unlock_mutex_ns(&seg->lcs_lock);
 
2989
                                goto retry;
 
2990
                        }
 
2991
 
 
2992
                        /* Wait if the block is being read: */
 
2993
                        if (block->xlb_state == XLC_BLOCK_READING) {
 
2994
                                if (have_global_lock)
 
2995
                                        goto done_ok;
 
2996
 
 
2997
                                /* Wait for the block to be read, then try again. */
 
2998
                                if (!xt_timed_wait_cond_ns(&seg->lcs_cond, &seg->lcs_lock, 100))
 
2999
                                        goto failed;
 
3000
                                xt_unlock_mutex_ns(&seg->lcs_lock);
 
3001
                                goto retry;
 
3002
                        }
 
3003
                        
 
3004
                        goto free_the_block;
 
3005
                }
 
3006
                pblock = block;
 
3007
                block = block->xlb_next;
 
3008
        }
 
3009
 
 
3010
        if (have_global_lock) {
 
3011
                xt_unlock_mutex_ns(&xt_xlog_cache.xlc_lock);
 
3012
                have_global_lock = FALSE;
 
3013
        }
 
3014
 
 
3015
        /* We did not find the block, someone else freed it... */
 
3016
        xt_unlock_mutex_ns(&seg->lcs_lock);
 
3017
        goto retry;
 
3018
 
 
3019
        free_the_block:
 
3020
        ASSERT_NS(block->xlb_state == XLC_BLOCK_CLEAN);
 
3021
 
 
3022
        /* Remove from the hash table: */
 
3023
        if (pblock)
 
3024
                pblock->xlb_next = block->xlb_next;
 
3025
        else
 
3026
                seg->lcs_hash_table[hash_idx] = block->xlb_next;
 
3027
 
 
3028
        /* Now free the block */
 
3029
        if (!have_global_lock) {
 
3030
                xt_lock_mutex_ns(&xt_xlog_cache.xlc_lock);
 
3031
                have_global_lock = TRUE;
 
3032
        }
 
3033
 
 
3034
        /* Remove from the MRU list: */
 
3035
        if (xt_xlog_cache.xlc_lru_block == block)
 
3036
                xt_xlog_cache.xlc_lru_block = block->xlb_mr_used;
 
3037
        if (xt_xlog_cache.xlc_mru_block == block)
 
3038
                xt_xlog_cache.xlc_mru_block = block->xlb_lr_used;
 
3039
        if (block->xlb_lr_used)
 
3040
                block->xlb_lr_used->xlb_mr_used = block->xlb_mr_used;
 
3041
        if (block->xlb_mr_used)
 
3042
                block->xlb_mr_used->xlb_lr_used = block->xlb_lr_used;
 
3043
 
 
3044
        /* Put the block on the free list: */
 
3045
        block->xlb_next = xt_xlog_cache.xlc_free_list;
 
3046
        xt_xlog_cache.xlc_free_list = block;
 
3047
        xt_xlog_cache.xlc_free_count++;
 
3048
        block->xlb_state = XLC_BLOCK_FREE;
 
3049
 
 
3050
        if (xt_xlog_cache.xlc_free_count < XT_XLC_MAX_FREE_COUNT) {
 
3051
                /* Now that we have all the locks, try to free some more in this segment: */
 
3052
                block = block->xlb_mr_used;
 
3053
                for (u_int i=0; block && i<XLC_SEGMENT_COUNT; i++) {
 
3054
                        ru_time = block->xlb_ru_time;
 
3055
                        log_id = block->xlb_log_id;
 
3056
                        address = block->xlb_address;
 
3057
 
 
3058
                        if (seg == &xt_xlog_cache.xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK]) {
 
3059
                                hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % xt_xlog_cache.xlc_hash_size;
 
3060
                                goto free_more;
 
3061
                        }
 
3062
                        block = block->xlb_mr_used;
 
3063
                }
 
3064
        }
 
3065
 
 
3066
        done_ok:
 
3067
        xt_unlock_mutex_ns(&xt_xlog_cache.xlc_lock);
 
3068
        xt_unlock_mutex_ns(&seg->lcs_lock);
 
3069
#ifdef DEBUG_CHECK_CACHE
 
3070
        //xt_xlog_check_cache();
 
3071
#endif
 
3072
        return OK;
 
3073
        
 
3074
        failed:
 
3075
        xt_unlock_mutex_ns(&seg->lcs_lock);
 
3076
#ifdef DEBUG_CHECK_CACHE
 
3077
        //xt_xlog_check_cache();
 
3078
#endif
 
3079
        return FAILED;
 
3080
}
 
3081
 
 
3082
#endif