~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/xactlog_xt.cc

  • Committer: Devananda
  • Date: 2009-07-04 01:55:13 UTC
  • mto: (1086.10.1 length-plugin)
  • mto: This revision was merged to the branch mainline in revision 1095.
  • Revision ID: deva@myst-20090704015513-gtqliazxtfm7sdvf
refactored function/length into plugin/length

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2007 PrimeBase Technologies GmbH
2
 
 *
3
 
 * PrimeBase XT
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * 2007-10-30   Paul McCullagh
20
 
 *
21
 
 * H&G2JCtL
22
 
 *
23
 
 * The transaction log contains all operations on the data handle
24
 
 * and row pointer files of a table.
25
 
 *
26
 
 * The transaction log does not contain operations on index data.
27
 
 */
28
 
 
29
 
#include "xt_config.h"
30
 
 
31
 
#ifdef DRIZZLED
32
 
#include <bitset>
33
 
#endif
34
 
 
35
 
#include <signal.h>
36
 
 
37
 
#include "xactlog_xt.h"
38
 
#include "database_xt.h"
39
 
#include "util_xt.h"
40
 
#include "strutil_xt.h"
41
 
#include "filesys_xt.h"
42
 
#include "myxt_xt.h"
43
 
#include "trace_xt.h"
44
 
 
45
 
#ifdef DEBUG
46
 
//#define PRINT_TABLE_MODIFICATIONS
47
 
//#define TRACE_WRITER_ACTIVITY
48
 
#endif
49
 
#ifndef XT_WIN
50
 
#ifndef XT_MAC
51
 
#define PREWRITE_LOG_COMPLETELY
52
 
#endif
53
 
#endif
54
 
 
55
 
static void xlog_wr_log_written(XTDatabaseHPtr db);
56
 
 
57
 
/*
58
 
 * -----------------------------------------------------------------------
59
 
 * T R A N S A C T I O   L O G   C A C H E
60
 
 */
61
 
 
62
 
static XTXLogCacheRec   xt_xlog_cache;
63
 
 
64
 
/*
65
 
 * Initialize the disk cache.
66
 
 */
67
 
xtPublic void xt_xlog_init(XTThreadPtr self, size_t cache_size)
68
 
{
69
 
        XTXLogBlockPtr  block;
70
 
 
71
 
        /*
72
 
         * This is required to ensure that the block
73
 
         * works!
74
 
         */
75
 
 
76
 
        /* Determine the number of block that will fit into the given memory: */
77
 
        /*
78
 
        xt_xlog_cache.xlc_hash_size = (cache_size / (XLC_SEGMENT_COUNT * sizeof(XTXLogBlockPtr) + sizeof(XTXLogBlockRec))) / (XLC_SEGMENT_COUNT >> 1);
79
 
        xt_xlog_cache.xlc_block_count = (cache_size - (XLC_SEGMENT_COUNT * xt_xlog_cache.xlc_hash_size * sizeof(XTXLogBlockPtr))) / sizeof(XTXLogBlockRec);
80
 
        */
81
 
        /* Do not count the size of the cache directory towards the cache size: */
82
 
        xt_xlog_cache.xlc_block_count = cache_size / sizeof(XTXLogBlockRec);
83
 
        xt_xlog_cache.xlc_upper_limit = ((xtWord8) xt_xlog_cache.xlc_block_count * (xtWord8) XT_XLC_BLOCK_SIZE * (xtWord8) 3) / (xtWord8) 4;
84
 
        xt_xlog_cache.xlc_hash_size = xt_xlog_cache.xlc_block_count / (XLC_SEGMENT_COUNT >> 1);
85
 
        if (!xt_xlog_cache.xlc_hash_size)
86
 
                xt_xlog_cache.xlc_hash_size = 1;
87
 
 
88
 
        try_(a) {
89
 
                for (u_int i=0; i<XLC_SEGMENT_COUNT; i++) {
90
 
                        xt_xlog_cache.xlc_segment[i].lcs_hash_table = (XTXLogBlockPtr *) xt_calloc(self, xt_xlog_cache.xlc_hash_size * sizeof(XTXLogBlockPtr));
91
 
                        xt_init_mutex_with_autoname(self, &xt_xlog_cache.xlc_segment[i].lcs_lock);
92
 
                        xt_init_cond(self, &xt_xlog_cache.xlc_segment[i].lcs_cond);
93
 
                }
94
 
 
95
 
                block = (XTXLogBlockPtr) xt_malloc(self, xt_xlog_cache.xlc_block_count * sizeof(XTXLogBlockRec));
96
 
                xt_xlog_cache.xlc_blocks = block; 
97
 
                xt_xlog_cache.xlc_blocks_end = (XTXLogBlockPtr) ((char *) block + (xt_xlog_cache.xlc_block_count * sizeof(XTXLogBlockRec))); 
98
 
                xt_xlog_cache.xlc_next_to_free = block; 
99
 
                xt_init_mutex_with_autoname(self, &xt_xlog_cache.xlc_lock);
100
 
                xt_init_cond(self, &xt_xlog_cache.xlc_cond);
101
 
 
102
 
                for (u_int i=0; i<xt_xlog_cache.xlc_block_count; i++) {
103
 
                        block->xlb_address = 0;
104
 
                        block->xlb_log_id = 0;
105
 
                        block->xlb_state = XLC_BLOCK_FREE;
106
 
                        block++;
107
 
                }
108
 
                xt_xlog_cache.xlc_free_count = xt_xlog_cache.xlc_block_count;
109
 
        }
110
 
        catch_(a) {
111
 
                XTException e;
112
 
 
113
 
                xt_enter_exception_handler(self, &e);
114
 
                xt_xlog_exit(self);
115
 
                xt_exit_exception_handler(self, &e);
116
 
                xt_throw(self);
117
 
        }
118
 
        cont_(a);
119
 
}
120
 
 
121
 
xtPublic void xt_xlog_exit(XTThreadPtr XT_UNUSED(self))
122
 
{
123
 
        for (u_int i=0; i<XLC_SEGMENT_COUNT; i++) {
124
 
                if (xt_xlog_cache.xlc_segment[i].lcs_hash_table) {
125
 
                        xt_free_ns(xt_xlog_cache.xlc_segment[i].lcs_hash_table);
126
 
                        xt_xlog_cache.xlc_segment[i].lcs_hash_table = NULL;
127
 
                        xt_free_mutex(&xt_xlog_cache.xlc_segment[i].lcs_lock);
128
 
                        xt_free_cond(&xt_xlog_cache.xlc_segment[i].lcs_cond);
129
 
                }
130
 
        }
131
 
 
132
 
        if (xt_xlog_cache.xlc_blocks) {
133
 
                xt_free_ns(xt_xlog_cache.xlc_blocks);
134
 
                xt_xlog_cache.xlc_blocks = NULL;
135
 
                xt_free_mutex(&xt_xlog_cache.xlc_lock);
136
 
                xt_free_cond(&xt_xlog_cache.xlc_cond);
137
 
        }
138
 
        memset(&xt_xlog_cache, 0, sizeof(xt_xlog_cache));
139
 
}
140
 
 
141
 
xtPublic xtInt8 xt_xlog_get_usage()
142
 
{
143
 
        xtInt8 size;
144
 
 
145
 
        size = (xtInt8) (xt_xlog_cache.xlc_block_count - xt_xlog_cache.xlc_free_count) * sizeof(XTXLogBlockRec);
146
 
        return size;
147
 
}
148
 
 
149
 
xtPublic xtInt8 xt_xlog_get_size()
150
 
{
151
 
        xtInt8 size;
152
 
 
153
 
        size = (xtInt8) xt_xlog_cache.xlc_block_count * sizeof(XTXLogBlockRec);
154
 
        return size;
155
 
}
156
 
 
157
 
xtPublic xtLogID xt_xlog_get_min_log(XTThreadPtr self, XTDatabaseHPtr db)
158
 
{
159
 
        char                    path[PATH_MAX];
160
 
        XTOpenDirPtr    od;
161
 
        char                    *file;
162
 
        xtLogID                 log_id, min_log = 0;
163
 
 
164
 
        xt_strcpy(PATH_MAX, path, db->db_main_path);
165
 
        xt_add_system_dir(PATH_MAX, path);
166
 
        if (xt_fs_exists(path)) {
167
 
                pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
168
 
                while (xt_dir_next(self, od)) {
169
 
                        file = xt_dir_name(self, od);
170
 
                        if (xt_starts_with(file, "xlog")) {
171
 
                                if ((log_id = (xtLogID) xt_file_name_to_id(file))) {
172
 
                                        if (!min_log || log_id < min_log)
173
 
                                                min_log = log_id;
174
 
                                }
175
 
                        }
176
 
                }
177
 
                freer_(); // xt_dir_close(od)
178
 
        }
179
 
        if (!min_log)
180
 
                return 1;
181
 
        return min_log;
182
 
}
183
 
 
184
 
xtPublic void xt_xlog_delete_logs(XTThreadPtr self, XTDatabaseHPtr db)
185
 
{
186
 
        char                    path[PATH_MAX];
187
 
        XTOpenDirPtr    od;
188
 
        char                    *file;
189
 
 
190
 
        /* Close all the index logs before we delete them: */
191
 
        db->db_indlogs.ilp_close(self, TRUE);
192
 
 
193
 
        /* Close the transaction logs too: */
194
 
        db->db_xlog.xlog_close(self);
195
 
 
196
 
        xt_strcpy(PATH_MAX, path, db->db_main_path);
197
 
        xt_add_system_dir(PATH_MAX, path);
198
 
        if (!xt_fs_exists(path))
199
 
                return;
200
 
        pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
201
 
        while (xt_dir_next(self, od)) {
202
 
                file = xt_dir_name(self, od);
203
 
                if (xt_ends_with(file, ".xt")) {
204
 
                        xt_add_dir_char(PATH_MAX, path);
205
 
                        xt_strcat(PATH_MAX, path, file);
206
 
                        xt_fs_delete(self, path);
207
 
                        xt_remove_last_name_of_path(path);
208
 
                }
209
 
        }
210
 
        freer_(); // xt_dir_close(od)
211
 
 
212
 
        /* I no longer attach the condition: !db->db_multi_path
213
 
         * to removing this directory. This is because
214
 
         * the pbxt directory must now be removed explicitly
215
 
         * by drop database, or by delete all the PBXT
216
 
         * system tables.
217
 
         */
218
 
        if (!xt_fs_rmdir(NULL, path))
219
 
                xt_log_and_clear_exception(self);
220
 
}
221
 
 
222
 
#ifdef DEBUG_CHECK_CACHE
223
 
static void xt_xlog_check_cache(void)
224
 
{
225
 
        XTXLogBlockPtr  block, pblock;
226
 
        u_int                   used_count;
227
 
        u_int                   free_count;
228
 
 
229
 
        // Check the LRU list:
230
 
        used_count = 0;
231
 
        pblock = NULL;
232
 
        block = xt_xlog_cache.xlc_lru_block;
233
 
        while (block) {
234
 
                used_count++;
235
 
                ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
236
 
                ASSERT_NS(block->xlb_lr_used == pblock);
237
 
                pblock = block;
238
 
                block = block->xlb_mr_used;
239
 
        }
240
 
        ASSERT_NS(xt_xlog_cache.xlc_mru_block == pblock);
241
 
        ASSERT_NS(xt_xlog_cache.xlc_free_count + used_count == xt_xlog_cache.xlc_block_count);
242
 
 
243
 
        // Check the free list:
244
 
        free_count = 0;
245
 
        block = xt_xlog_cache.xlc_free_list;
246
 
        while (block) {
247
 
                free_count++;
248
 
                ASSERT_NS(block->xlb_state == XLC_BLOCK_FREE);
249
 
                block = block->xlb_next;
250
 
        }
251
 
        ASSERT_NS(xt_xlog_cache.xlc_free_count == free_count);
252
 
}
253
 
#endif
254
 
 
255
 
#ifdef FOR_DEBUG
256
 
static void xlog_check_lru_list(XTXLogBlockPtr block)
257
 
{
258
 
        XTXLogBlockPtr list_block, plist_block;
259
 
        
260
 
        plist_block = NULL;
261
 
        list_block = xt_xlog_cache.xlc_lru_block;
262
 
        while (list_block) {
263
 
                ASSERT_NS(block != list_block);
264
 
                ASSERT_NS(list_block->xlb_lr_used == plist_block);
265
 
                plist_block = list_block;
266
 
                list_block = list_block->xlb_mr_used;
267
 
        }
268
 
        ASSERT_NS(xt_xlog_cache.xlc_mru_block == plist_block);
269
 
}
270
 
#endif
271
 
 
272
 
/*
273
 
 * Log cache blocks are used and freed on a round-robin basis.
274
 
 * In addition, only data read by restart, and data transfered
275
 
 * from the transaction log are stored in the transaction log.
276
 
 *
277
 
 * This ensures that the transaction log contains the most
278
 
 * recently written log data.
279
 
 *
280
 
 * If the sweeper gets behind due to a long running transacation
281
 
 * then it falls out of the log cache, and must read from
282
 
 * the log files directly.
283
 
 *
284
 
 * This data read is no longer cached as it was previously.
285
 
 * This has the advantage that it does not disturn the writter
286
 
 * thread which would otherwise hit the cache.
287
 
 *
288
 
 * If transactions are not too long, it should be possible
289
 
 * to keep the sweeper in the log cache.
290
 
 */
291
 
static xtBool xlog_free_block(XTXLogBlockPtr to_free)
292
 
{
293
 
        XTXLogBlockPtr          block, pblock;
294
 
        xtLogID                         log_id;
295
 
        off_t                           address;
296
 
        XTXLogCacheSegPtr       seg;
297
 
        u_int                           hash_idx;
298
 
 
299
 
        retry:
300
 
        log_id = to_free->xlb_log_id;
301
 
        address = to_free->xlb_address;
302
 
 
303
 
        seg = &xt_xlog_cache.xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK];
304
 
        hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % xt_xlog_cache.xlc_hash_size;
305
 
 
306
 
        xt_lock_mutex_ns(&seg->lcs_lock);
307
 
        if (to_free->xlb_state == XLC_BLOCK_FREE)
308
 
                goto done_ok;
309
 
        if (to_free->xlb_log_id != log_id || to_free->xlb_address != address) {
310
 
                xt_unlock_mutex_ns(&seg->lcs_lock);
311
 
                goto retry;
312
 
        }
313
 
 
314
 
        pblock = NULL;
315
 
        block = seg->lcs_hash_table[hash_idx];
316
 
        while (block) {
317
 
                if (block->xlb_address == address && block->xlb_log_id == log_id) {
318
 
                        ASSERT_NS(block == to_free);
319
 
                        ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
320
 
                        
321
 
                        /* Wait if the block is being read: */
322
 
                        if (block->xlb_state == XLC_BLOCK_READING) {
323
 
                                /* Wait for the block to be read, then try again. */
324
 
                                if (!xt_timed_wait_cond_ns(&seg->lcs_cond, &seg->lcs_lock, 100))
325
 
                                        goto failed;
326
 
                                xt_unlock_mutex_ns(&seg->lcs_lock);
327
 
                                goto retry;
328
 
                        }
329
 
                        
330
 
                        goto free_the_block;
331
 
                }
332
 
                pblock = block;
333
 
                block = block->xlb_next;
334
 
        }
335
 
 
336
 
        /* We did not find the block, someone else freed it... */
337
 
        xt_unlock_mutex_ns(&seg->lcs_lock);
338
 
        goto retry;
339
 
 
340
 
        free_the_block:
341
 
        ASSERT_NS(block->xlb_state == XLC_BLOCK_CLEAN);
342
 
 
343
 
        /* Remove from the hash table: */
344
 
        if (pblock)
345
 
                pblock->xlb_next = block->xlb_next;
346
 
        else
347
 
                seg->lcs_hash_table[hash_idx] = block->xlb_next;
348
 
 
349
 
        /* Free the block: */
350
 
        xt_xlog_cache.xlc_free_count++;
351
 
        block->xlb_state = XLC_BLOCK_FREE;
352
 
 
353
 
        done_ok:
354
 
        xt_unlock_mutex_ns(&seg->lcs_lock);
355
 
        return OK;
356
 
        
357
 
        failed:
358
 
        xt_unlock_mutex_ns(&seg->lcs_lock);
359
 
        return FAILED;
360
 
}
361
 
 
362
 
#define XT_FETCH_READ           0
363
 
#define XT_FETCH_BLANK          1
364
 
#define XT_FETCH_TEST           2
365
 
 
366
 
static xtBool xlog_fetch_block(XTXLogBlockPtr *ret_block, XTOpenFilePtr file, xtLogID log_id, off_t address, XTXLogCacheSegPtr *ret_seg, int fetch_type, XTThreadPtr thread)
367
 
{
368
 
        register XTXLogBlockPtr         block;
369
 
        register XTXLogCacheSegPtr      seg;
370
 
        register u_int                          hash_idx;
371
 
        register XTXLogCacheRec         *dcg = &xt_xlog_cache;
372
 
        size_t                                          red_size;
373
 
 
374
 
        /* Make sure we have a free block ready (to avoid unlock below): */
375
 
        if (fetch_type != XT_FETCH_TEST && dcg->xlc_next_to_free->xlb_state != XLC_BLOCK_FREE) {
376
 
                if (!xlog_free_block(dcg->xlc_next_to_free))
377
 
                        return FAILED;
378
 
        }
379
 
 
380
 
        seg = &dcg->xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK];
381
 
        hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % dcg->xlc_hash_size;
382
 
 
383
 
        xt_lock_mutex_ns(&seg->lcs_lock);
384
 
        retry:
385
 
        block = seg->lcs_hash_table[hash_idx];
386
 
        while (block) {
387
 
                if (block->xlb_address == address && block->xlb_log_id == log_id) {
388
 
                        ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
389
 
 
390
 
                        /*
391
 
                         * Wait if the block is being read.
392
 
                         */
393
 
                        if (block->xlb_state == XLC_BLOCK_READING) {
394
 
                                if (!xt_timed_wait_cond_ns(&seg->lcs_cond, &seg->lcs_lock, 100)) {
395
 
                                        xt_unlock_mutex_ns(&seg->lcs_lock);
396
 
                                        return FAILED;
397
 
                                }
398
 
                                goto retry;
399
 
                        }
400
 
 
401
 
                        *ret_seg = seg;
402
 
                        *ret_block = block;
403
 
                        thread->st_statistics.st_xlog_cache_hit++;
404
 
                        return OK;
405
 
                }
406
 
                block = block->xlb_next;
407
 
        }
408
 
 
409
 
        if (fetch_type == XT_FETCH_TEST) {
410
 
                xt_unlock_mutex_ns(&seg->lcs_lock);
411
 
                *ret_seg = NULL;
412
 
                *ret_block = NULL;
413
 
                thread->st_statistics.st_xlog_cache_miss++;
414
 
                return OK;
415
 
        }
416
 
 
417
 
        /* Block not found: */
418
 
        get_free_block:
419
 
        if (dcg->xlc_next_to_free->xlb_state != XLC_BLOCK_FREE) {
420
 
                xt_unlock_mutex_ns(&seg->lcs_lock);
421
 
                if (!xlog_free_block(dcg->xlc_next_to_free))
422
 
                        return FAILED;
423
 
                xt_lock_mutex_ns(&seg->lcs_lock);
424
 
        }
425
 
 
426
 
        xt_lock_mutex_ns(&dcg->xlc_lock);
427
 
        block = dcg->xlc_next_to_free;
428
 
        if (block->xlb_state != XLC_BLOCK_FREE) {
429
 
                xt_unlock_mutex_ns(&dcg->xlc_lock);
430
 
                goto get_free_block;
431
 
        }
432
 
        dcg->xlc_next_to_free++;
433
 
        if (dcg->xlc_next_to_free == dcg->xlc_blocks_end)
434
 
                dcg->xlc_next_to_free = dcg->xlc_blocks;
435
 
        dcg->xlc_free_count--;
436
 
 
437
 
        if (fetch_type == XT_FETCH_READ) {
438
 
                block->xlb_address = address;
439
 
                block->xlb_log_id = log_id;
440
 
                block->xlb_state = XLC_BLOCK_READING;
441
 
 
442
 
                xt_unlock_mutex_ns(&dcg->xlc_lock);
443
 
 
444
 
                /* Add the block to the hash table: */
445
 
                block->xlb_next = seg->lcs_hash_table[hash_idx];
446
 
                seg->lcs_hash_table[hash_idx] = block;
447
 
 
448
 
                /* Read the block into memory: */
449
 
                xt_unlock_mutex_ns(&seg->lcs_lock);
450
 
 
451
 
                if (!xt_pread_file(file, address, XT_XLC_BLOCK_SIZE, 0, block->xlb_data, &red_size, &thread->st_statistics.st_xlog, thread))
452
 
                        return FAILED;
453
 
                memset(block->xlb_data + red_size, 0, XT_XLC_BLOCK_SIZE - red_size);
454
 
                thread->st_statistics.st_xlog_cache_miss++;
455
 
 
456
 
                xt_lock_mutex_ns(&seg->lcs_lock);
457
 
                block->xlb_state = XLC_BLOCK_CLEAN;
458
 
                xt_cond_wakeall(&seg->lcs_cond);
459
 
        }
460
 
        else {
461
 
                block->xlb_address = address;
462
 
                block->xlb_log_id = log_id;
463
 
                block->xlb_state = XLC_BLOCK_CLEAN;
464
 
                memset(block->xlb_data, 0, XT_XLC_BLOCK_SIZE);
465
 
 
466
 
                xt_unlock_mutex_ns(&dcg->xlc_lock);
467
 
 
468
 
                /* Add the block to the hash table: */
469
 
                block->xlb_next = seg->lcs_hash_table[hash_idx];
470
 
                seg->lcs_hash_table[hash_idx] = block;
471
 
        }
472
 
 
473
 
        *ret_seg = seg;
474
 
        *ret_block = block;
475
 
#ifdef DEBUG_CHECK_CACHE
476
 
        //xt_xlog_check_cache();
477
 
#endif
478
 
        return OK;
479
 
}
480
 
 
481
 
static xtBool xlog_transfer_to_cache(XTOpenFilePtr file, xtLogID log_id, off_t offset, size_t size, xtWord1 *data, XTThreadPtr thread)
482
 
{
483
 
        off_t                           address;
484
 
        XTXLogBlockPtr          block;
485
 
        XTXLogCacheSegPtr       seg;
486
 
        size_t                          boff;
487
 
        size_t                          tfer;
488
 
        xtBool                          read_block = FALSE;
489
 
 
490
 
#ifdef DEBUG_CHECK_CACHE
491
 
        //xt_xlog_check_cache();
492
 
#endif
493
 
        /* We have to read the first block, if we are
494
 
         * not at the begining of the file:
495
 
         */
496
 
        if (offset)
497
 
                read_block = TRUE;
498
 
        address = offset & ~XT_XLC_BLOCK_MASK;
499
 
 
500
 
        boff = (size_t) (offset - address);
501
 
        tfer = XT_XLC_BLOCK_SIZE - boff;
502
 
        if (tfer > size)
503
 
                tfer = size;
504
 
        while (size > 0) {
505
 
                if (!xlog_fetch_block(&block, file, log_id, address, &seg, read_block ? XT_FETCH_READ : XT_FETCH_BLANK, thread)) {
506
 
#ifdef DEBUG_CHECK_CACHE
507
 
                        //xt_xlog_check_cache();
508
 
#endif
509
 
                        return FAILED;
510
 
                }
511
 
                ASSERT_NS(block && block->xlb_state == XLC_BLOCK_CLEAN);
512
 
                memcpy(block->xlb_data + boff, data, tfer);
513
 
                xt_unlock_mutex_ns(&seg->lcs_lock);
514
 
                size -= tfer;
515
 
                data += tfer;
516
 
 
517
 
                /* Following block need not be read
518
 
                 * because we always transfer to the
519
 
                 * end of the file!
520
 
                 */
521
 
                read_block = FALSE;
522
 
                address += XT_XLC_BLOCK_SIZE;
523
 
 
524
 
                boff = 0;
525
 
                tfer = size;
526
 
                if (tfer > XT_XLC_BLOCK_SIZE)
527
 
                        tfer = XT_XLC_BLOCK_SIZE;
528
 
        }
529
 
#ifdef DEBUG_CHECK_CACHE
530
 
        //xt_xlog_check_cache();
531
 
#endif
532
 
        return OK;
533
 
}
534
 
 
535
 
static xtBool xt_xlog_read(XTOpenFilePtr file, xtLogID log_id, off_t offset, size_t size, xtWord1 *data, xtBool load_cache, XTThreadPtr thread)
536
 
{
537
 
        off_t                           address;
538
 
        XTXLogBlockPtr          block;
539
 
        XTXLogCacheSegPtr       seg;
540
 
        size_t                          boff;
541
 
        size_t                          tfer;
542
 
 
543
 
#ifdef DEBUG_CHECK_CACHE
544
 
        //xt_xlog_check_cache();
545
 
#endif
546
 
        address = offset & ~XT_XLC_BLOCK_MASK;
547
 
        boff = (size_t) (offset - address);
548
 
        tfer = XT_XLC_BLOCK_SIZE - boff;
549
 
        if (tfer > size)
550
 
                tfer = size;
551
 
        while (size > 0) {
552
 
                if (!xlog_fetch_block(&block, file, log_id, address, &seg, load_cache ? XT_FETCH_READ : XT_FETCH_TEST, thread))
553
 
                        return FAILED;
554
 
                if (!block) {
555
 
                        size_t red_size;
556
 
 
557
 
                        if (!xt_pread_file(file, address + boff, size, 0, data, &red_size, &thread->st_statistics.st_xlog, thread))
558
 
                                return FAILED;
559
 
                        memset(data + red_size, 0, size - red_size);
560
 
                        return OK;
561
 
                }
562
 
                memcpy(data, block->xlb_data + boff, tfer);
563
 
                xt_unlock_mutex_ns(&seg->lcs_lock);
564
 
                size -= tfer;
565
 
                data += tfer;
566
 
                address += XT_XLC_BLOCK_SIZE;
567
 
                boff = 0;
568
 
                tfer = size;
569
 
                if (tfer > XT_XLC_BLOCK_SIZE)
570
 
                        tfer = XT_XLC_BLOCK_SIZE;
571
 
        }
572
 
#ifdef DEBUG_CHECK_CACHE
573
 
        //xt_xlog_check_cache();
574
 
#endif
575
 
        return OK;
576
 
}
577
 
 
578
 
static xtBool xt_xlog_write(XTOpenFilePtr file, xtLogID log_id, off_t offset, size_t size, xtWord1 *data, XTThreadPtr thread)
579
 
{
580
 
        if (!xt_pwrite_file(file, offset, size, data, &thread->st_statistics.st_xlog, thread))
581
 
                return FAILED;
582
 
        return xlog_transfer_to_cache(file, log_id, offset, size, data, thread);
583
 
}
584
 
 
585
 
/*
586
 
 * -----------------------------------------------------------------------
587
 
 * D A T A B A S E   T R A N S A C T I O N   L O G S
588
 
 */
589
 
 
590
 
void XTDatabaseLog::xlog_setup(XTThreadPtr self, XTDatabaseHPtr db, off_t inp_log_file_size, size_t transaction_buffer_size, int log_count)
591
 
{
592
 
        volatile off_t  log_file_size = inp_log_file_size;
593
 
        size_t                  log_size;
594
 
 
595
 
        try_(a) {
596
 
                memset(this, 0, sizeof(XTDatabaseLogRec));
597
 
 
598
 
                if (log_count <= 1)
599
 
                        log_count = 1;
600
 
                else if (log_count > 1000000)
601
 
                        log_count = 1000000;
602
 
 
603
 
                xl_db = db;
604
 
 
605
 
                xl_log_file_threshold = xt_align_offset(log_file_size, 1024);
606
 
                xl_log_file_count = log_count;
607
 
                xl_size_of_buffers = transaction_buffer_size;
608
 
        
609
 
                xt_init_mutex_with_autoname(self, &xl_write_lock);
610
 
                xt_init_cond(self, &xl_write_cond);
611
 
#ifdef XT_XLOG_WAIT_SPINS
612
 
                xt_writing = 0;
613
 
                xt_waiting = 0;
614
 
#else
615
 
                xt_writing = FALSE;
616
 
#endif
617
 
                xl_log_id = 0;
618
 
                xl_log_file = 0;
619
 
        
620
 
                xt_spinlock_init_with_autoname(self, &xl_buffer_lock);
621
 
 
622
 
                /* Note that we allocate a little bit more for each buffer
623
 
                 * in order to make sure that we can write a trailing record
624
 
                 * to the log buffer.
625
 
                 */
626
 
                log_size = transaction_buffer_size + sizeof(XTXactNewLogEntryDRec);
627
 
                
628
 
                /* Add in order to round the buffer to an integral of 512 */
629
 
                if (log_size % 512)
630
 
                        log_size += (512 - (log_size % 512));
631
 
 
632
 
                xl_write_log_id = 0;
633
 
                xl_write_log_offset = 0;
634
 
                xl_write_buf_pos = 0;
635
 
                xl_write_buf_pos_start = 0;
636
 
                xl_write_buffer = (xtWord1 *) xt_malloc(self, log_size);
637
 
                xl_write_done = TRUE;
638
 
 
639
 
                xl_append_log_id = 0;
640
 
                xl_append_log_offset = 0;
641
 
                xl_append_buf_pos = 0;
642
 
                xl_append_buf_pos_start = 0;
643
 
                xl_append_buffer = (xtWord1 *) xt_malloc(self, log_size);
644
 
 
645
 
                xl_last_flush_time = 10;
646
 
                xl_flush_log_id = 0;
647
 
                xl_flush_log_offset = 0;
648
 
        }
649
 
        catch_(a) {
650
 
                XTException e;
651
 
 
652
 
                xt_enter_exception_handler(self, &e);
653
 
                xlog_exit(self);
654
 
                xt_exit_exception_handler(self, &e);
655
 
                xt_throw(self);
656
 
        }
657
 
        cont_(a);
658
 
}
659
 
 
660
 
xtBool XTDatabaseLog::xlog_set_write_offset(xtLogID log_id, xtLogOffset log_offset, xtLogID max_log_id, XTThreadPtr thread)
661
 
{
662
 
        xl_max_log_id = max_log_id;
663
 
 
664
 
        xl_write_log_id = log_id;
665
 
        xl_write_log_offset = log_offset;
666
 
        xl_write_buf_pos = 0;
667
 
        xl_write_buf_pos_start = 0;
668
 
        xl_write_done = TRUE;
669
 
 
670
 
        xl_append_log_id = log_id;
671
 
        xl_append_log_offset = log_offset;
672
 
        if (log_offset == 0) {
673
 
                XTXactLogHeaderDPtr log_head;
674
 
 
675
 
                log_head = (XTXactLogHeaderDPtr) xl_append_buffer;
676
 
                memset(log_head, 0, sizeof(XTXactLogHeaderDRec));
677
 
                log_head->xh_status_1 = XT_LOG_ENT_HEADER;
678
 
                log_head->xh_checksum_1 = XT_CHECKSUM_1(log_id);
679
 
                XT_SET_DISK_4(log_head->xh_size_4, sizeof(XTXactLogHeaderDRec));
680
 
                XT_SET_DISK_4(log_head->xh_log_id_4, log_id);
681
 
                XT_SET_DISK_2(log_head->xh_version_2, XT_LOG_VERSION_NO);
682
 
                XT_SET_DISK_4(log_head->xh_magic_4, XT_LOG_FILE_MAGIC);
683
 
                xl_append_buf_pos = sizeof(XTXactLogHeaderDRec);
684
 
                xl_append_buf_pos_start = 0;
685
 
        }
686
 
        else {
687
 
                /* Start the log buffer at a block boundary: */
688
 
                size_t buf_pos = (size_t) (log_offset % 512);
689
 
 
690
 
                xl_append_buf_pos = buf_pos;
691
 
                xl_append_buf_pos_start = buf_pos;
692
 
                xl_append_log_offset = log_offset - buf_pos;
693
 
 
694
 
                if (!xlog_open_log(log_id, log_offset, thread))
695
 
                        return FAILED;
696
 
 
697
 
                if (!xt_pread_file(xl_log_file, xl_append_log_offset, buf_pos, buf_pos, xl_append_buffer, NULL, &thread->st_statistics.st_xlog, thread))
698
 
                        return FAILED;
699
 
        }
700
 
 
701
 
        xl_flush_log_id = log_id;
702
 
        xl_flush_log_offset = log_offset;
703
 
        return OK;
704
 
}
705
 
 
706
 
void XTDatabaseLog::xlog_close(XTThreadPtr self)
707
 
{
708
 
        if (xl_log_file) {
709
 
                xt_close_file(self, xl_log_file);
710
 
                xl_log_file = NULL;
711
 
        }
712
 
}
713
 
 
714
 
void XTDatabaseLog::xlog_exit(XTThreadPtr self)
715
 
{
716
 
        xt_spinlock_free(self, &xl_buffer_lock);
717
 
        xt_free_mutex(&xl_write_lock);
718
 
        xt_free_cond(&xl_write_cond);
719
 
        xlog_close(self);
720
 
        if (xl_write_buffer) {
721
 
                xt_free_ns(xl_write_buffer);
722
 
                xl_write_buffer = NULL;
723
 
        }
724
 
        if (xl_append_buffer) {
725
 
                xt_free_ns(xl_append_buffer);
726
 
                xl_append_buffer = NULL;
727
 
        }
728
 
}
729
 
 
730
 
#define WR_NO_SPACE             1                       /* Write because there is no space, or some other reason */
731
 
#define WR_FLUSH                2                       /* Normal commit, write and flush */
732
 
 
733
 
xtBool XTDatabaseLog::xlog_flush(XTThreadPtr thread)
734
 
{
735
 
        if (!xlog_flush_pending())
736
 
                return OK;
737
 
        return xlog_append(thread, 0, NULL, 0, NULL, XT_XLOG_WRITE_AND_FLUSH, NULL, NULL);
738
 
}
739
 
 
740
 
xtBool XTDatabaseLog::xlog_flush_pending()
741
 
{
742
 
        xtLogID         req_flush_log_id;
743
 
        xtLogOffset     req_flush_log_offset;
744
 
 
745
 
        xt_lck_slock(&xl_buffer_lock);
746
 
        req_flush_log_id = xl_append_log_id;
747
 
        req_flush_log_offset = xl_append_log_offset + xl_append_buf_pos;
748
 
        if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
749
 
                xt_spinlock_unlock(&xl_buffer_lock);
750
 
                return FALSE;
751
 
        }
752
 
        xt_spinlock_unlock(&xl_buffer_lock);
753
 
        return TRUE;
754
 
}
755
 
 
756
 
/*
757
 
 * Write data to the end of the log buffer.
758
 
 *
759
 
 * commit is set to true if the caller also requires
760
 
 * the log to be flushed, after writing the data.
761
 
 *
762
 
 * This function returns the log ID and offset of
763
 
 * the data write position.
764
 
 */
765
 
xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *data1, size_t size2, xtWord1 *data2, int flush_log_at_trx_commit, xtLogID *log_id, xtLogOffset *log_offset)
766
 
{
767
 
        int                     write_reason = 0;
768
 
        xtLogID         req_flush_log_id;
769
 
        xtLogOffset     req_flush_log_offset;
770
 
        size_t          part_size;
771
 
        xtWord8         flush_time;
772
 
        xtWord2         sum;
773
 
 
774
 
        /* The first size value must be set, of the second is set! */
775
 
        ASSERT_NS(size1 || !size2);
776
 
 
777
 
        if (!size1) {
778
 
                /* Just flush the buffer... */
779
 
                xt_lck_slock(&xl_buffer_lock);
780
 
                write_reason = flush_log_at_trx_commit == XT_XLOG_WRITE_AND_FLUSH ? WR_FLUSH : WR_NO_SPACE;
781
 
                req_flush_log_id = xl_append_log_id;
782
 
                req_flush_log_offset = xl_append_log_offset + xl_append_buf_pos;
783
 
                xt_spinlock_unlock(&xl_buffer_lock);
784
 
                goto write_log_to_file;
785
 
        }
786
 
        req_flush_log_id = 0;
787
 
        req_flush_log_offset = 0;
788
 
 
789
 
        /*
790
 
         * This is a dirty read, which will send us to the
791
 
         * best starting position:
792
 
         *
793
 
         * If there is space, now, then there is probably
794
 
         * still enough space, after we have locked the
795
 
         * buffer for writting.
796
 
         */
797
 
        if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers)
798
 
                goto copy_to_log_buffer;
799
 
 
800
 
        /*
801
 
         * There is not enough space in the append buffer.
802
 
         * So we need to write the log, until there is space.
803
 
         */
804
 
        write_reason = WR_NO_SPACE;
805
 
 
806
 
        write_log_to_file:
807
 
        if (write_reason) {
808
 
                /* We need to write for one of 2 reasons: not
809
 
                 * enough space in the buffer, or a flush
810
 
                 * is required.
811
 
                 */
812
 
                xtWord8 then;
813
 
                 
814
 
                /*
815
 
                 * The objective of the following code is to
816
 
                 * pick one writer, out of all threads.
817
 
                 * The rest will wait for the writer.
818
 
                 */
819
 
 
820
 
                if (write_reason == WR_FLUSH) {
821
 
                        /* Before we flush, check if we should wait for running
822
 
                         * transactions that may commit shortly.
823
 
                         */
824
 
                        if (xl_db->db_xn_writer_count - xl_db->db_xn_writer_wait_count - xl_db->db_xn_long_running_count > 0 && xl_last_flush_time) {
825
 
                                /* Wait for about as long as the last flush took,
826
 
                                 * the idea is to saturate the disk with flushing...: */
827
 
                                then = xt_trace_clock() + (xtWord8) xl_last_flush_time;
828
 
                                for (;;) {
829
 
                                        xt_critical_wait();
830
 
                                        /* If a thread leaves this loop because times up, or
831
 
                                         * a thread manages to flush so fast that this thread
832
 
                                         * sleeps during this time, then it could be that
833
 
                                         * the required flush occurs before other conditions
834
 
                                         * of this loop are met!
835
 
                                         *
836
 
                                         * So we check here to make sure that the log has not been
837
 
                                         * flushed as we require:
838
 
                                         */
839
 
                                        if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
840
 
                                                ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
841
 
                                                return OK;
842
 
                                        }
843
 
 
844
 
                                        if (xl_db->db_xn_writer_count - xl_db->db_xn_writer_wait_count - xl_db->db_xn_long_running_count > 0)
845
 
                                                break;
846
 
                                        if (xt_trace_clock() >= then)
847
 
                                                break;
848
 
                                }
849
 
                        }
850
 
                }
851
 
 
852
 
#ifdef XT_XLOG_WAIT_SPINS
853
 
                /* Spin for 1/1000s: */
854
 
                then = xt_trace_clock() + (xtWord8) 1000;
855
 
                for (;;) {
856
 
                        if (!xt_atomic_tas4(&xt_writing, 1))
857
 
                                break;
858
 
 
859
 
                        /* If I am not the writer, then I just waited for the
860
 
                         * writer. So it may be that my requirements have now
861
 
                         * been met!
862
 
                         */
863
 
                        if (write_reason == WR_FLUSH) {
864
 
                                /* If the reason was to flush, then
865
 
                                 * check the last flush sequence, maybe it is passed
866
 
                                 * our required sequence.
867
 
                                 */
868
 
                                if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
869
 
                                        /* The required flush position of the log is before
870
 
                                         * or equal to the actual flush position. This means the condition
871
 
                                         * for this thread have been satified (via group commit).
872
 
                                         * Nothing more to do!
873
 
                                         */
874
 
                                        ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
875
 
                                        return OK;
876
 
                                }
877
 
                        }
878
 
                        else if (size1) {
879
 
                                /* It may be that there is now space in the append buffer: */
880
 
                                if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers)
881
 
                                        goto copy_to_log_buffer;
882
 
                        }
883
 
                        else {
884
 
                                /* We are just writing the buffer! */
885
 
                                ASSERT_NS(write_reason == WR_NO_SPACE);
886
 
                                if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0)
887
 
                                        return OK;
888
 
                        }
889
 
 
890
 
                        if (xt_trace_clock() >= then) {
891
 
                                xt_lock_mutex_ns(&xl_write_lock);
892
 
                                xt_waiting++;
893
 
                                if (!xt_timed_wait_cond_ns(&xl_write_cond, &xl_write_lock, 500)) {
894
 
                                        xt_waiting--;
895
 
                                        xt_unlock_mutex_ns(&xl_write_lock);
896
 
                                        return FALSE;
897
 
                                }
898
 
                                xt_waiting--;
899
 
                                xt_unlock_mutex_ns(&xl_write_lock);
900
 
                        }
901
 
                        else
902
 
                                xt_critical_wait();
903
 
                }
904
 
#else
905
 
                xtBool i_am_writer;
906
 
 
907
 
                i_am_writer = FALSE;
908
 
                xt_lock_mutex_ns(&xl_write_lock);
909
 
                if (xt_writing) {
910
 
                        if (!xt_timed_wait_cond_ns(&xl_write_cond, &xl_write_lock, 500)) {
911
 
                                xt_unlock_mutex_ns(&xl_write_lock);
912
 
                                return FALSE;
913
 
                        }
914
 
                }
915
 
                else {
916
 
                        xt_writing = TRUE;
917
 
                        i_am_writer = TRUE;
918
 
                }
919
 
                xt_unlock_mutex_ns(&xl_write_lock);
920
 
 
921
 
                if (!i_am_writer) {
922
 
                        /* If I am not the writer, then I just waited for the
923
 
                         * writer. So it may be that my requirements have now
924
 
                         * been met!
925
 
                         */
926
 
                        if (write_reason == WR_FLUSH) {
927
 
                                /* If the reason was to flush, then
928
 
                                 * check the last flush sequence, maybe it is passed
929
 
                                 * our required sequence.
930
 
                                 */
931
 
                                if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
932
 
                                        /* The required flush position of the log is before
933
 
                                         * or equal to the actual flush position. This means the condition
934
 
                                         * for this thread have been satified (via group commit).
935
 
                                         * Nothing more to do!
936
 
                                         */
937
 
                                        ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
938
 
                                        return OK;
939
 
                                }
940
 
                        }
941
 
                        else if (size1) {
942
 
                                /* It may be that there is now space in the append buffer: */
943
 
                                if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers)
944
 
                                        goto copy_to_log_buffer;
945
 
                        }
946
 
                        else {
947
 
                                /* We are just writing the buffer! */
948
 
                                ASSERT_NS(write_reason == WR_NO_SPACE);
949
 
                                if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0)
950
 
                                        return OK;
951
 
                        }
952
 
                                
953
 
                        goto write_log_to_file;
954
 
                }
955
 
#endif
956
 
 
957
 
                /* I am the writer, check the conditions, again: */
958
 
                if (write_reason == WR_FLUSH) {
959
 
                        /* The writer wants the log to be flushed to a particular point: */
960
 
                        if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) <= 0) {
961
 
                                /* The writers required flush position is before or equal
962
 
                                 * to the actual position, so the writer is done...
963
 
                                 */
964
 
#ifdef XT_XLOG_WAIT_SPINS
965
 
                                xt_writing = 0;
966
 
                                if (xt_waiting)
967
 
                                        xt_cond_wakeall(&xl_write_cond);
968
 
#else
969
 
                                xt_writing = FALSE;
970
 
                                xt_cond_wakeall(&xl_write_cond);
971
 
#endif
972
 
                                ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
973
 
                                return OK;
974
 
                        }
975
 
                        /* Not flushed, but what about written? */
976
 
                        if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0) {
977
 
                                /* The write position is after or equal to the required flush
978
 
                                 * position. This means that all we have to do is flush
979
 
                                 * to satisfy the writers condition.
980
 
                                 */
981
 
                                xtBool ok = TRUE;
982
 
 
983
 
                                if (xl_log_id != xl_write_log_id)
984
 
                                        ok = xlog_open_log(xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start), thread);
985
 
 
986
 
                                if (ok) {
987
 
                                        if (xl_db->db_co_busy) {
988
 
                                                /* [(8)] Flush the compactor log. */
989
 
                                                xt_lock_mutex_ns(&xl_db->db_co_dlog_lock);
990
 
                                                ok = xl_db->db_co_thread->st_dlog_buf.dlb_flush_log(TRUE, thread);
991
 
                                                xt_unlock_mutex_ns(&xl_db->db_co_dlog_lock);
992
 
                                        }
993
 
                                }
994
 
 
995
 
                                if (ok) {
996
 
                                        flush_time = thread->st_statistics.st_xlog.ts_flush_time;
997
 
                                        if ((ok = xt_flush_file(xl_log_file, &thread->st_statistics.st_xlog, thread))) {
998
 
                                                xl_last_flush_time = (u_int) (thread->st_statistics.st_xlog.ts_flush_time - flush_time);
999
 
                                                xl_log_bytes_flushed = xl_log_bytes_written;
1000
 
 
1001
 
                                                xt_lock_mutex_ns(&xl_db->db_wr_lock);
1002
 
                                                xl_flush_log_id = xl_write_log_id;
1003
 
                                                xl_flush_log_offset = xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start);
1004
 
                                                /*
1005
 
                                                 * We have written data to the log, wake the writer to commit
1006
 
                                                * the data to the database.
1007
 
                                                */
1008
 
                                                xlog_wr_log_written(xl_db);
1009
 
                                                xt_unlock_mutex_ns(&xl_db->db_wr_lock);
1010
 
                                        }
1011
 
                                }
1012
 
#ifdef XT_XLOG_WAIT_SPINS
1013
 
                                xt_writing = 0;
1014
 
                                if (xt_waiting)
1015
 
                                        xt_cond_wakeall(&xl_write_cond);
1016
 
#else
1017
 
                                xt_writing = FALSE;
1018
 
                                xt_cond_wakeall(&xl_write_cond);
1019
 
#endif
1020
 
                                ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
1021
 
                                return ok;
1022
 
                        }
1023
 
                }
1024
 
                else if (size1) {
1025
 
                        /* If the amounf of data to be written is 0, then we are just required
1026
 
                         * to write the transaction buffer.
1027
 
                         *
1028
 
                         * If there is space in the buffer, then we can go on
1029
 
                         * to copy our data into the buffer:
1030
 
                         */
1031
 
                        if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers) {
1032
 
#ifdef XT_XLOG_WAIT_SPINS
1033
 
                                xt_writing = 0;
1034
 
                                if (xt_waiting)
1035
 
                                        xt_cond_wakeall(&xl_write_cond);
1036
 
#else
1037
 
                                xt_writing = FALSE;
1038
 
                                xt_cond_wakeall(&xl_write_cond);
1039
 
#endif
1040
 
                                goto copy_to_log_buffer;
1041
 
                        }
1042
 
                }
1043
 
                else {
1044
 
                        /* We are just writing the buffer! */
1045
 
                        ASSERT_NS(write_reason == WR_NO_SPACE);
1046
 
                        if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0) {
1047
 
#ifdef XT_XLOG_WAIT_SPINS
1048
 
                                xt_writing = 0;
1049
 
                                if (xt_waiting)
1050
 
                                        xt_cond_wakeall(&xl_write_cond);
1051
 
#else
1052
 
                                xt_writing = FALSE;
1053
 
                                xt_cond_wakeall(&xl_write_cond);
1054
 
#endif
1055
 
                                return OK;
1056
 
                        }
1057
 
                }
1058
 
 
1059
 
                rewrite:
1060
 
                /* If the current write buffer has been written, then
1061
 
                 * switch the logs. Otherwise we must try to existing
1062
 
                 * write buffer.
1063
 
                 */
1064
 
                if (xl_write_done) {
1065
 
                        /* This means that the current write buffer has been writen,
1066
 
                         * i.e. it is empty!
1067
 
                         */
1068
 
                        xt_spinlock_lock(&xl_buffer_lock);
1069
 
                        xtWord1 *tmp_buffer = xl_write_buffer;
1070
 
 
1071
 
                        /* The write position is now the append position: */
1072
 
                        xl_write_log_id = xl_append_log_id;
1073
 
                        xl_write_log_offset = xl_append_log_offset;
1074
 
                        xl_write_buf_pos = xl_append_buf_pos;
1075
 
                        xl_write_buf_pos_start = xl_append_buf_pos_start;
1076
 
                        xl_write_buffer = xl_append_buffer;
1077
 
                        xl_write_done = FALSE;
1078
 
 
1079
 
                        /* We have to maintain 512 byte alignment: */
1080
 
                        ASSERT_NS((xl_write_log_offset % 512) == 0);
1081
 
                        part_size = xl_write_buf_pos % 512;
1082
 
                        if (part_size != 0)
1083
 
                                memcpy(tmp_buffer, xl_write_buffer + xl_write_buf_pos - part_size, part_size);
1084
 
 
1085
 
                        /* The new append position will be after the
1086
 
                         * current append position:
1087
 
                         */
1088
 
                        xl_append_log_offset += xl_append_buf_pos - part_size;
1089
 
                        xl_append_buf_pos = part_size;
1090
 
                        xl_append_buf_pos_start = part_size;
1091
 
                        xl_append_buffer = tmp_buffer; // The old write buffer (which is empty)
1092
 
 
1093
 
                        /*
1094
 
                         * If the append offset exceeds the log threshhold, then
1095
 
                         * we set the append buffer to a new log file:
1096
 
                         *
1097
 
                         * NOTE: This algorithm will cause the log to be overwriten by a maximum
1098
 
                         * of the log buffer size!
1099
 
                         */
1100
 
                        if (xl_append_log_offset >= xl_log_file_threshold) {
1101
 
                                XTXactNewLogEntryDPtr   log_tail;
1102
 
                                XTXactLogHeaderDPtr             log_head;
1103
 
 
1104
 
                                xl_append_log_id++;
1105
 
 
1106
 
                                /* Write the final record to the old log.
1107
 
                                 * There is enough space for this because we allocate the
1108
 
                                 * buffer a little bigger than required.
1109
 
                                 */
1110
 
                                log_tail = (XTXactNewLogEntryDPtr) (xl_write_buffer + xl_write_buf_pos);
1111
 
                                log_tail->xl_status_1 = XT_LOG_ENT_NEW_LOG;
1112
 
                                log_tail->xl_checksum_1 = XT_CHECKSUM_1(xl_append_log_id) ^ XT_CHECKSUM_1(xl_write_log_id);
1113
 
                                XT_SET_DISK_4(log_tail->xl_log_id_4, xl_append_log_id);
1114
 
                                xl_write_buf_pos += sizeof(XTXactNewLogEntryDRec);
1115
 
 
1116
 
                                /* We add the header to the next log. */
1117
 
                                log_head = (XTXactLogHeaderDPtr) xl_append_buffer;
1118
 
                                memset(log_head, 0, sizeof(XTXactLogHeaderDRec));
1119
 
                                log_head->xh_status_1 = XT_LOG_ENT_HEADER;
1120
 
                                log_head->xh_checksum_1 = XT_CHECKSUM_1(xl_append_log_id);
1121
 
                                XT_SET_DISK_4(log_head->xh_size_4, sizeof(XTXactLogHeaderDRec));
1122
 
                                XT_SET_DISK_4(log_head->xh_log_id_4, xl_append_log_id);
1123
 
                                XT_SET_DISK_2(log_head->xh_version_2, XT_LOG_VERSION_NO);
1124
 
                                XT_SET_DISK_4(log_head->xh_magic_4, XT_LOG_FILE_MAGIC);
1125
 
 
1126
 
                                xl_append_log_offset = 0;
1127
 
                                xl_append_buf_pos = sizeof(XTXactLogHeaderDRec);
1128
 
                                xl_append_buf_pos_start = 0;
1129
 
                        }
1130
 
                        xt_spinlock_unlock(&xl_buffer_lock);
1131
 
                        /* We have completed the switch. The append buffer is empty, and
1132
 
                         * other threads can begin to write to it.
1133
 
                         *
1134
 
                         * Meanwhile, this thread will write the write buffer...
1135
 
                         */
1136
 
                }
1137
 
 
1138
 
                /* Make sure we have the correct log open: */
1139
 
                if (xl_log_id != xl_write_log_id) {
1140
 
                        if (!xlog_open_log(xl_write_log_id, xl_write_log_offset, thread))
1141
 
                                goto write_failed;
1142
 
                }
1143
 
 
1144
 
                /* Write the buffer. */
1145
 
                /* Always write an integral number of 512 byte blocks: */
1146
 
                ASSERT_NS((xl_write_log_offset % 512) == 0);
1147
 
                if ((part_size = xl_write_buf_pos % 512)) {
1148
 
                        part_size = 512 - part_size;
1149
 
                        xl_write_buffer[xl_write_buf_pos] = XT_LOG_ENT_END_OF_LOG;
1150
 
#ifdef HAVE_valgrind
1151
 
                        if (part_size > 1)
1152
 
                                memset(xl_write_buffer + xl_write_buf_pos + 1, 0x66, part_size - 1);
1153
 
#endif
1154
 
                        if (!xt_pwrite_file(xl_log_file, xl_write_log_offset, xl_write_buf_pos+part_size, xl_write_buffer, &thread->st_statistics.st_xlog, thread))
1155
 
                                goto write_failed;                      
1156
 
                }
1157
 
                else {
1158
 
                        if (!xt_pwrite_file(xl_log_file, xl_write_log_offset, xl_write_buf_pos, xl_write_buffer, &thread->st_statistics.st_xlog, thread))
1159
 
                                goto write_failed;
1160
 
                }
1161
 
 
1162
 
                /* This part has not been written: */
1163
 
                part_size = xl_write_buf_pos - xl_write_buf_pos_start;
1164
 
 
1165
 
                /* We have written the data to the log, transfer
1166
 
                 * the buffer data into the cache. */
1167
 
                if (!xlog_transfer_to_cache(xl_log_file, xl_log_id, xl_write_log_offset+xl_write_buf_pos_start, part_size, xl_write_buffer+xl_write_buf_pos_start, thread))
1168
 
                        goto write_failed;
1169
 
 
1170
 
                xl_write_done = TRUE;
1171
 
                xl_log_bytes_written += part_size;
1172
 
 
1173
 
                if (write_reason == WR_FLUSH) {
1174
 
                        if (xl_db->db_co_busy) {
1175
 
                                /* [(8)] Flush the compactor log. */
1176
 
                                xt_lock_mutex_ns(&xl_db->db_co_dlog_lock);
1177
 
                                if (!xl_db->db_co_thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
1178
 
                                        xl_log_bytes_written -= part_size;
1179
 
                                        xt_unlock_mutex_ns(&xl_db->db_co_dlog_lock);
1180
 
                                        goto write_failed;
1181
 
                                }
1182
 
                                xt_unlock_mutex_ns(&xl_db->db_co_dlog_lock);
1183
 
                        }
1184
 
 
1185
 
                        /* And flush if required: */
1186
 
                        flush_time = thread->st_statistics.st_xlog.ts_flush_time;
1187
 
                        if (!xt_flush_file(xl_log_file, &thread->st_statistics.st_xlog, thread)) {
1188
 
                                xl_log_bytes_written -= part_size;
1189
 
                                goto write_failed;
1190
 
                        }
1191
 
                        xl_last_flush_time = (u_int) (thread->st_statistics.st_xlog.ts_flush_time - flush_time);
1192
 
 
1193
 
                        xl_log_bytes_flushed = xl_log_bytes_written;
1194
 
 
1195
 
                        xt_lock_mutex_ns(&xl_db->db_wr_lock);
1196
 
                        xl_flush_log_id = xl_write_log_id;
1197
 
                        xl_flush_log_offset = xl_write_log_offset + xl_write_buf_pos;
1198
 
                        /*
1199
 
                         * We have written data to the log, wake the writer to commit
1200
 
                         * the data to the database.
1201
 
                         */
1202
 
                        xlog_wr_log_written(xl_db);
1203
 
                        xt_unlock_mutex_ns(&xl_db->db_wr_lock);
1204
 
 
1205
 
                        /* Check that the require flush condition has arrived. */
1206
 
                        if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_flush_log_id, xl_flush_log_offset) > 0)
1207
 
                                /* The required position is still after the current flush
1208
 
                                 * position, continue writing: */
1209
 
                                goto rewrite;
1210
 
 
1211
 
#ifdef XT_XLOG_WAIT_SPINS
1212
 
                        xt_writing = 0;
1213
 
                        if (xt_waiting)
1214
 
                                xt_cond_wakeall(&xl_write_cond);
1215
 
#else
1216
 
                        xt_writing = FALSE;
1217
 
                        xt_cond_wakeall(&xl_write_cond);
1218
 
#endif
1219
 
                        ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
1220
 
                        return OK;
1221
 
                }
1222
 
                else
1223
 
                        xlog_wr_log_written(xl_db);
1224
 
 
1225
 
                /*
1226
 
                 * Check that the buffer is now available, otherwise,
1227
 
                 * switch and write again!
1228
 
                 */
1229
 
                if (xl_append_buf_pos + size1 + size2 > xl_size_of_buffers)
1230
 
                        goto rewrite;
1231
 
 
1232
 
#ifdef XT_XLOG_WAIT_SPINS
1233
 
                xt_writing = 0;
1234
 
                if (xt_waiting)
1235
 
                        xt_cond_wakeall(&xl_write_cond);
1236
 
#else
1237
 
                xt_writing = FALSE;
1238
 
                xt_cond_wakeall(&xl_write_cond);
1239
 
#endif
1240
 
 
1241
 
                if (size1 == 0)
1242
 
                        return OK;
1243
 
        }
1244
 
 
1245
 
        copy_to_log_buffer:
1246
 
        ASSERT_NS(size1);
1247
 
        xt_spinlock_lock(&xl_buffer_lock);
1248
 
        /* Now we have to check again. The check above was a dirty read!
1249
 
         */
1250
 
        if (xl_append_buf_pos + size1 + size2 > xl_size_of_buffers) {
1251
 
                xt_spinlock_unlock(&xl_buffer_lock);
1252
 
                /* Not enough space, write the buffer, and return here. */
1253
 
                write_reason = WR_NO_SPACE;
1254
 
                goto write_log_to_file;
1255
 
        }
1256
 
 
1257
 
        memcpy(xl_append_buffer + xl_append_buf_pos, data1, size1);
1258
 
        if (size2)
1259
 
                memcpy(xl_append_buffer + xl_append_buf_pos + size1, data2, size2);
1260
 
        /* Add the log ID to the checksum!
1261
 
         * This is required because log files are re-used, and we don't
1262
 
         * want the records to be valid when the log is re-used.
1263
 
         */
1264
 
        register XTXactLogBufferDPtr record;
1265
 
 
1266
 
        /*
1267
 
         * Adjust db_xn_writer_count here. It is protected by
1268
 
         * xl_buffer_lock.
1269
 
         */
1270
 
        record = (XTXactLogBufferDPtr) (xl_append_buffer + xl_append_buf_pos);
1271
 
        switch (record->xh.xh_status_1) {
1272
 
                case XT_LOG_ENT_HEADER:
1273
 
                case XT_LOG_ENT_END_OF_LOG:
1274
 
                        break;
1275
 
                case XT_LOG_ENT_REC_MODIFIED:
1276
 
                case XT_LOG_ENT_UPDATE:
1277
 
                case XT_LOG_ENT_UPDATE_BG:
1278
 
                case XT_LOG_ENT_UPDATE_FL:
1279
 
                case XT_LOG_ENT_UPDATE_FL_BG:
1280
 
                case XT_LOG_ENT_INSERT:
1281
 
                case XT_LOG_ENT_INSERT_BG:
1282
 
                case XT_LOG_ENT_INSERT_FL:
1283
 
                case XT_LOG_ENT_INSERT_FL_BG:
1284
 
                case XT_LOG_ENT_DELETE:
1285
 
                case XT_LOG_ENT_DELETE_BG:
1286
 
                case XT_LOG_ENT_DELETE_FL:
1287
 
                case XT_LOG_ENT_DELETE_FL_BG:
1288
 
                        sum = XT_GET_DISK_2(record->xu.xu_checksum_2) ^ XT_CHECKSUM_2(xl_append_log_id);
1289
 
                        XT_SET_DISK_2(record->xu.xu_checksum_2, sum);
1290
 
 
1291
 
                        if (!thread->st_xact_writer) {
1292
 
                                thread->st_xact_writer = TRUE;
1293
 
                                thread->st_xact_write_time = xt_db_approximate_time;
1294
 
                                xl_db->db_xn_writer_count++;
1295
 
                                xl_db->db_xn_total_writer_count++;
1296
 
                        }
1297
 
                        break;
1298
 
                case XT_LOG_ENT_REC_REMOVED_BI:
1299
 
                case XT_LOG_ENT_REC_REMOVED_BI_L:
1300
 
                        sum = XT_GET_DISK_2(record->rb.rb_checksum_2) ^ XT_CHECKSUM_2(xl_append_log_id);
1301
 
                        XT_SET_DISK_2(record->rb.rb_checksum_2, sum);
1302
 
                        break;
1303
 
                case XT_LOG_ENT_ROW_NEW:
1304
 
                case XT_LOG_ENT_ROW_NEW_FL:
1305
 
                        record->xl.xl_checksum_1 ^= XT_CHECKSUM_1(xl_append_log_id);
1306
 
 
1307
 
                        if (!thread->st_xact_writer) {
1308
 
                                thread->st_xact_writer = TRUE;
1309
 
                                thread->st_xact_write_time = xt_db_approximate_time;
1310
 
                                xl_db->db_xn_writer_count++;
1311
 
                                xl_db->db_xn_total_writer_count++;
1312
 
                        }
1313
 
                        break;
1314
 
                case XT_LOG_ENT_COMMIT:
1315
 
                case XT_LOG_ENT_ABORT:
1316
 
                        ASSERT_NS(thread->st_xact_writer);
1317
 
                        ASSERT_NS(xl_db->db_xn_writer_count > 0);
1318
 
                        if (thread->st_xact_writer) {
1319
 
                                xl_db->db_xn_writer_count--;
1320
 
                                thread->st_xact_writer = FALSE;
1321
 
                                if (thread->st_xact_long_running) {
1322
 
                                        xl_db->db_xn_long_running_count--;
1323
 
                                        thread->st_xact_long_running = FALSE;
1324
 
                                }
1325
 
                        }
1326
 
                        /* No break required! */
1327
 
                default:
1328
 
                        record->xl.xl_checksum_1 ^= XT_CHECKSUM_1(xl_append_log_id);
1329
 
                        break;
1330
 
        }
1331
 
#ifdef DEBUG
1332
 
        ASSERT_NS(xlog_verify(record, size1 + size2, xl_append_log_id));
1333
 
#endif
1334
 
        if (log_id)
1335
 
                *log_id = xl_append_log_id;
1336
 
        if (log_offset)
1337
 
                *log_offset = xl_append_log_offset + xl_append_buf_pos;
1338
 
        xl_append_buf_pos += size1 + size2;
1339
 
        if (flush_log_at_trx_commit != XT_XLOG_NO_WRITE_NO_FLUSH) {
1340
 
                write_reason = flush_log_at_trx_commit == XT_XLOG_WRITE_AND_FLUSH ? WR_FLUSH : WR_NO_SPACE;
1341
 
                req_flush_log_id = xl_append_log_id;
1342
 
                req_flush_log_offset = xl_append_log_offset + xl_append_buf_pos;
1343
 
                xt_spinlock_unlock(&xl_buffer_lock);
1344
 
                /* We have written the data already! */
1345
 
                size1 = 0;
1346
 
                size2 = 0;
1347
 
                goto write_log_to_file;
1348
 
        }
1349
 
 
1350
 
        // Failed sometime when outside the spinlock!
1351
 
        ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset + xl_append_buf_pos) <= 0); 
1352
 
        xt_spinlock_unlock(&xl_buffer_lock);
1353
 
 
1354
 
        return OK;
1355
 
 
1356
 
        write_failed:
1357
 
#ifdef XT_XLOG_WAIT_SPINS
1358
 
        xt_writing = 0;
1359
 
        if (xt_waiting)
1360
 
                xt_cond_wakeall(&xl_write_cond);
1361
 
#else
1362
 
        xt_writing = FALSE;
1363
 
        xt_cond_wakeall(&xl_write_cond);
1364
 
#endif
1365
 
        return FAILED;
1366
 
}
1367
 
 
1368
 
/*
1369
 
 * This function does not always delete the log. It may just rename a
1370
 
 * log to a new log which it will need.
1371
 
 * This speeds things up:
1372
 
 *
1373
 
 * - No need to pre-allocate the new log.
1374
 
 * - Log data is already flushed (i.e. disk blocks allocated)
1375
 
 * - Log is already in OS cache.
1376
 
 *
1377
 
 * However, it means that I need to checksum things differently
1378
 
 * on each log to make sure I do not treat an old record
1379
 
 * as valid!
1380
 
 *
1381
 
 * Return OK, FAILED or XT_ERR
1382
 
 */ 
1383
 
int XTDatabaseLog::xlog_delete_log(xtLogID del_log_id, XTThreadPtr thread)
1384
 
{
1385
 
        char    path[PATH_MAX];
1386
 
 
1387
 
        if (xl_max_log_id < xl_write_log_id)
1388
 
                xl_max_log_id = xl_write_log_id;
1389
 
 
1390
 
        xlog_name(PATH_MAX, path, del_log_id);
1391
 
 
1392
 
        if (xt_db_offline_log_function == XT_RECYCLE_LOGS) {
1393
 
                char    new_path[PATH_MAX];
1394
 
                xtLogID new_log_id;
1395
 
                xtBool  ok;
1396
 
 
1397
 
                /* Make sure that the total logs is less than or equal to the log file count
1398
 
                 * (plus dynamic component).
1399
 
                 */
1400
 
                while (xl_max_log_id - del_log_id + 1 <= (xl_log_file_count + xt_log_file_dyn_count) &&
1401
 
                        /* And the number of logs after the current log (including the current log)
1402
 
                         * must be less or equal to the log file count. */
1403
 
                        xl_max_log_id - xl_write_log_id + 1 <= xl_log_file_count) {
1404
 
                        new_log_id = xl_max_log_id+1;
1405
 
                        xlog_name(PATH_MAX, new_path, new_log_id);
1406
 
                        ok = xt_fs_rename(NULL, path, new_path);
1407
 
                        if (ok) {
1408
 
                                xl_max_log_id = new_log_id;
1409
 
                                goto done;
1410
 
                        }
1411
 
                        if (!xt_fs_exists(new_path)) {
1412
 
                                /* Try again later: */
1413
 
                                if (thread->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
1414
 
                                        XT_FILE_IN_USE(thread->t_exception.e_sys_err))
1415
 
                                        return FAILED;
1416
 
 
1417
 
                                return XT_ERR;
1418
 
                        }
1419
 
                        xl_max_log_id = new_log_id;
1420
 
                }
1421
 
        }
1422
 
 
1423
 
        if (xt_db_offline_log_function != XT_KEEP_LOGS) {
1424
 
                if (!xt_fs_delete(NULL, path)) {
1425
 
                        if (thread->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
1426
 
                                XT_FILE_IN_USE(thread->t_exception.e_sys_err))
1427
 
                                return FAILED;
1428
 
 
1429
 
                        return XT_ERR;
1430
 
                }
1431
 
        }
1432
 
 
1433
 
        done:
1434
 
        return OK;
1435
 
}
1436
 
 
1437
 
/* PRIVATE FUNCTIONS */
1438
 
xtBool XTDatabaseLog::xlog_open_log(xtLogID log_id, off_t curr_write_pos, XTThreadPtr thread)
1439
 
{
1440
 
        char    log_path[PATH_MAX];
1441
 
        off_t   eof;
1442
 
 
1443
 
        if (xl_log_id == log_id)
1444
 
                return OK;
1445
 
 
1446
 
        if (xl_log_file) {
1447
 
                if (!xt_flush_file(xl_log_file, &thread->st_statistics.st_xlog, thread))
1448
 
                        return FAILED;
1449
 
                xt_close_file_ns(xl_log_file);
1450
 
                xl_log_file = NULL;
1451
 
                xl_log_id = 0;
1452
 
        }
1453
 
 
1454
 
        xlog_name(PATH_MAX, log_path, log_id);
1455
 
        if (!(xl_log_file = xt_open_file_ns(log_path, XT_FT_STANDARD, XT_FS_CREATE | XT_FS_MAKE_PATH, 16*1024*1024)))
1456
 
                return FAILED;
1457
 
        /* Allocate space until the required size: */
1458
 
        if (curr_write_pos <  xl_log_file_threshold) {
1459
 
                eof = xt_seek_eof_file(NULL, xl_log_file);
1460
 
                if (eof == 0) {
1461
 
                        /* A new file (bad), we need a greater file count: */
1462
 
                        xt_log_file_dyn_count++;
1463
 
                        xt_log_file_dyn_dec = 4;
1464
 
                }
1465
 
                else {
1466
 
                        /* An existing file (good): */
1467
 
                        if (xt_log_file_dyn_count > 0) {
1468
 
                                if (xt_log_file_dyn_dec > 0)
1469
 
                                        xt_log_file_dyn_dec--;
1470
 
                                else
1471
 
                                        xt_log_file_dyn_count--;
1472
 
                        }
1473
 
                }
1474
 
                if (eof < xl_log_file_threshold) {
1475
 
                        char    buffer[2048];
1476
 
                        size_t  tfer;
1477
 
 
1478
 
                        memset(buffer, 0, 2048);
1479
 
 
1480
 
                        curr_write_pos = xt_align_offset(curr_write_pos, 512);
1481
 
#ifdef PREWRITE_LOG_COMPLETELY
1482
 
                        while (curr_write_pos < xl_log_file_threshold) {
1483
 
                                tfer = 2048;
1484
 
                                if ((off_t) tfer > xl_log_file_threshold - curr_write_pos)
1485
 
                                        tfer = (size_t) (xl_log_file_threshold - curr_write_pos);
1486
 
                                if (curr_write_pos == 0)
1487
 
                                        *buffer = XT_LOG_ENT_END_OF_LOG;
1488
 
                                if (!xt_pwrite_file(xl_log_file, curr_write_pos, tfer, buffer, &thread->st_statistics.st_xlog, thread))
1489
 
                                        return FAILED;
1490
 
                                *buffer = 0;
1491
 
                                curr_write_pos += tfer;
1492
 
                        }
1493
 
#else
1494
 
                        if (curr_write_pos < xl_log_file_threshold) {
1495
 
                                tfer = 2048;
1496
 
                                
1497
 
                                if (curr_write_pos < xl_log_file_threshold - 2048)
1498
 
                                        curr_write_pos = xl_log_file_threshold - 2048;
1499
 
                                if ((off_t) tfer > xl_log_file_threshold - curr_write_pos)
1500
 
                                        tfer = (size_t) (xl_log_file_threshold - curr_write_pos);
1501
 
                                if (!xt_pwrite_file(xl_log_file, curr_write_pos, tfer, buffer, &thread->st_statistics.st_xlog, thread))
1502
 
                                        return FAILED;
1503
 
                        }
1504
 
#endif
1505
 
                }
1506
 
                else if (eof > xl_log_file_threshold + (128 * 1024 * 1024)) {
1507
 
                        if (!xt_set_eof_file(NULL, xl_log_file, xl_log_file_threshold))
1508
 
                                return FAILED;
1509
 
                }
1510
 
        }
1511
 
        xl_log_id = log_id;
1512
 
        return OK;
1513
 
}
1514
 
 
1515
 
void XTDatabaseLog::xlog_name(size_t size, char *path, xtLogID log_id)
1516
 
{
1517
 
        char name[50];
1518
 
 
1519
 
        sprintf(name, "xlog-%lu.xt", (u_long) log_id);
1520
 
        xt_strcpy(size, path, xl_db->db_main_path);
1521
 
        xt_add_system_dir(size, path);
1522
 
        xt_add_dir_char(size, path);
1523
 
        xt_strcat(size, path, name);
1524
 
}
1525
 
 
1526
 
/*
1527
 
 * -----------------------------------------------------------------------
1528
 
 * T H R E A D   T R A N S A C T I O N   B U F F E R
1529
 
 */
1530
 
 
1531
 
xtPublic xtBool xt_xlog_flush_log(struct XTDatabase *db, XTThreadPtr thread)
1532
 
{
1533
 
        return db->db_xlog.xlog_flush(thread);
1534
 
}
1535
 
 
1536
 
xtPublic xtBool xt_xlog_log_data(XTThreadPtr thread, size_t size, XTXactLogBufferDPtr log_entry, int flush_log_at_trx_commit)
1537
 
{
1538
 
        return thread->st_database->db_xlog.xlog_append(thread, size, (xtWord1 *) log_entry, 0, NULL, flush_log_at_trx_commit, NULL, NULL);
1539
 
}
1540
 
 
1541
 
/* Allocate a record from the free list. */
1542
 
xtPublic xtBool xt_xlog_modify_table(xtTableID tab_id, u_int status, xtOpSeqNo op_seq, xtWord1 new_rec_type, xtRecordID free_rec_id, xtRecordID rec_id, size_t size, xtWord1 *data, XTThreadPtr thread)
1543
 
{
1544
 
        XTXactLogBufferDRec     log_entry;
1545
 
        size_t                          len;
1546
 
        xtWord4                         sum = 0;
1547
 
        int                                     check_size = 1;
1548
 
        XTXactDataPtr           xact = NULL;
1549
 
        int                                     flush_log_at_trx_commit = XT_XLOG_NO_WRITE_NO_FLUSH;
1550
 
 
1551
 
        switch (status) {
1552
 
                case XT_LOG_ENT_REC_MODIFIED:
1553
 
                case XT_LOG_ENT_UPDATE:
1554
 
                case XT_LOG_ENT_INSERT:
1555
 
                case XT_LOG_ENT_DELETE:
1556
 
                        check_size = 2;
1557
 
                        XT_SET_DISK_4(log_entry.xu.xu_op_seq_4, op_seq);
1558
 
                        XT_SET_DISK_4(log_entry.xu.xu_tab_id_4, tab_id);
1559
 
                        XT_SET_DISK_4(log_entry.xu.xu_rec_id_4, rec_id);
1560
 
                        XT_SET_DISK_2(log_entry.xu.xu_size_2, size);
1561
 
                        len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1);
1562
 
                        if (!(thread->st_xact_data->xd_flags & XT_XN_XAC_LOGGED)) {
1563
 
                                /* Add _BG: */
1564
 
                                status++;
1565
 
                                xact = thread->st_xact_data;
1566
 
                                xact->xd_flags |= XT_XN_XAC_LOGGED;
1567
 
                        }
1568
 
                        break;
1569
 
                case XT_LOG_ENT_UPDATE_FL:
1570
 
                case XT_LOG_ENT_INSERT_FL:
1571
 
                case XT_LOG_ENT_DELETE_FL:
1572
 
                        check_size = 2;
1573
 
                        XT_SET_DISK_4(log_entry.xf.xf_op_seq_4, op_seq);
1574
 
                        XT_SET_DISK_4(log_entry.xf.xf_tab_id_4, tab_id);
1575
 
                        XT_SET_DISK_4(log_entry.xf.xf_rec_id_4, rec_id);
1576
 
                        XT_SET_DISK_2(log_entry.xf.xf_size_2, size);
1577
 
                        XT_SET_DISK_4(log_entry.xf.xf_free_rec_id_4, free_rec_id);
1578
 
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
1579
 
                        len = offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1);
1580
 
                        if (!(thread->st_xact_data->xd_flags & XT_XN_XAC_LOGGED)) {
1581
 
                                /* Add _BG: */
1582
 
                                status++;
1583
 
                                xact = thread->st_xact_data;
1584
 
                                xact->xd_flags |= XT_XN_XAC_LOGGED;
1585
 
                        }
1586
 
                        break;
1587
 
                case XT_DEFUNKT_REC_FREED:
1588
 
                case XT_DEFUNKT_REC_REMOVED:
1589
 
                case XT_DEFUNKT_REC_REMOVED_EXT:
1590
 
                        ASSERT_NS(size == 1 + XT_XACT_ID_SIZE + sizeof(XTTabRecFreeDRec));
1591
 
                        XT_SET_DISK_4(log_entry.fr.fr_op_seq_4, op_seq);
1592
 
                        XT_SET_DISK_4(log_entry.fr.fr_tab_id_4, tab_id);
1593
 
                        XT_SET_DISK_4(log_entry.fr.fr_rec_id_4, rec_id);
1594
 
                        len = offsetof(XTactFreeRecEntryDRec, fr_stat_id_1);
1595
 
                        break;
1596
 
                case XT_LOG_ENT_REC_REMOVED_BI:
1597
 
                        check_size = 2;
1598
 
                        XT_SET_DISK_4(log_entry.rb.rb_op_seq_4, op_seq);
1599
 
                        XT_SET_DISK_4(log_entry.rb.rb_tab_id_4, tab_id);
1600
 
                        XT_SET_DISK_4(log_entry.rb.rb_rec_id_4, rec_id);
1601
 
                        XT_SET_DISK_2(log_entry.rb.rb_size_2, size);
1602
 
                        log_entry.rb.rb_new_rec_type_1 = new_rec_type;
1603
 
                        sum ^= XT_CHECKSUM4_REC((xtWord4) new_rec_type);
1604
 
                        len = offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1);
1605
 
                        break;
1606
 
                case XT_LOG_ENT_REC_REMOVED_BI_L:
1607
 
                        check_size = 2;
1608
 
                        XT_SET_DISK_4(log_entry.bl.bl_op_seq_4, op_seq);
1609
 
                        XT_SET_DISK_4(log_entry.bl.bl_tab_id_4, tab_id);
1610
 
                        XT_SET_DISK_4(log_entry.bl.bl_rec_id_4, rec_id);
1611
 
                        XT_SET_DISK_2(log_entry.bl.bl_size_2, size);
1612
 
                        XT_SET_DISK_4(log_entry.bl.bl_prev_rec_id_4, free_rec_id);
1613
 
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
1614
 
                        log_entry.bl.bl_new_rec_type_1 = new_rec_type;
1615
 
                        sum ^= XT_CHECKSUM4_REC((xtWord4) new_rec_type);
1616
 
                        len = offsetof(XTactRemoveBILEntryDRec, bl_rec_type_1);
1617
 
                        break;
1618
 
                case XT_LOG_ENT_REC_MOVED:
1619
 
                        ASSERT_NS(size == 8);
1620
 
                        XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
1621
 
                        XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
1622
 
                        XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
1623
 
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
1624
 
                        break;
1625
 
                case XT_LOG_ENT_REC_CLEANED:
1626
 
                        ASSERT_NS(size == offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE);
1627
 
                        XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
1628
 
                        XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
1629
 
                        XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
1630
 
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
1631
 
                        break;
1632
 
                case XT_LOG_ENT_REC_CLEANED_1:
1633
 
                        ASSERT_NS(size == 1);
1634
 
                        XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
1635
 
                        XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
1636
 
                        XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
1637
 
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
1638
 
                        break;
1639
 
                case XT_LOG_ENT_REC_UNLINKED:
1640
 
                        ASSERT_NS(size == offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE);
1641
 
                        XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
1642
 
                        XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
1643
 
                        XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
1644
 
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
1645
 
                        break;
1646
 
                case XT_LOG_ENT_ROW_NEW:
1647
 
                        ASSERT_NS(size == 0);
1648
 
                        XT_SET_DISK_4(log_entry.xa.xa_op_seq_4, op_seq);
1649
 
                        XT_SET_DISK_4(log_entry.xa.xa_tab_id_4, tab_id);
1650
 
                        XT_SET_DISK_4(log_entry.xa.xa_row_id_4, rec_id);
1651
 
                        len = offsetof(XTactRowAddedEntryDRec, xa_row_id_4) + XT_ROW_ID_SIZE;
1652
 
                        break;
1653
 
                case XT_LOG_ENT_ROW_NEW_FL:
1654
 
                        ASSERT_NS(size == 0);
1655
 
                        XT_SET_DISK_4(log_entry.xa.xa_op_seq_4, op_seq);
1656
 
                        XT_SET_DISK_4(log_entry.xa.xa_tab_id_4, tab_id);
1657
 
                        XT_SET_DISK_4(log_entry.xa.xa_row_id_4, rec_id);
1658
 
                        XT_SET_DISK_4(log_entry.xa.xa_free_list_4, free_rec_id);
1659
 
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
1660
 
                        len = offsetof(XTactRowAddedEntryDRec, xa_free_list_4) + XT_ROW_ID_SIZE;
1661
 
                        break;
1662
 
                case XT_LOG_ENT_ROW_ADD_REC:
1663
 
                case XT_LOG_ENT_ROW_SET:
1664
 
                case XT_LOG_ENT_ROW_FREED:
1665
 
                        ASSERT_NS(size == sizeof(XTTabRowRefDRec));
1666
 
                        XT_SET_DISK_4(log_entry.wr.wr_op_seq_4, op_seq);
1667
 
                        XT_SET_DISK_4(log_entry.wr.wr_tab_id_4, tab_id);
1668
 
                        XT_SET_DISK_4(log_entry.wr.wr_row_id_4, rec_id);
1669
 
                        len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4);
1670
 
                        break;
1671
 
                case XT_LOG_ENT_PREPARE:
1672
 
                        check_size = 2;
1673
 
                        XT_SET_DISK_4(log_entry.xp.xp_xact_id_4, op_seq);
1674
 
                        log_entry.xp.xp_xa_len_1 = (xtWord1) size;
1675
 
                        len = offsetof(XTXactPrepareEntryDRec, xp_xa_data);
1676
 
                        flush_log_at_trx_commit = xt_db_flush_log_at_trx_commit;
1677
 
                        break;
1678
 
                default:
1679
 
                        ASSERT_NS(FALSE);
1680
 
                        len = 0;
1681
 
                        break;
1682
 
        }
1683
 
 
1684
 
        xtWord1 *dptr = data;
1685
 
        xtWord4 g;
1686
 
 
1687
 
        sum ^= op_seq ^ (tab_id << 8) ^ XT_CHECKSUM4_REC(rec_id);
1688
 
        if ((g = sum & 0xF0000000)) {
1689
 
                sum = sum ^ (g >> 24);
1690
 
                sum = sum ^ g;
1691
 
        }
1692
 
        for (u_int i=0; i<(u_int) size; i++) {
1693
 
                sum = (sum << 4) + *dptr;
1694
 
                if ((g = sum & 0xF0000000)) {
1695
 
                        sum = sum ^ (g >> 24);
1696
 
                        sum = sum ^ g;
1697
 
                }
1698
 
                dptr++;
1699
 
        }
1700
 
 
1701
 
        log_entry.xh.xh_status_1 = status;
1702
 
        if (check_size == 1) {
1703
 
                log_entry.xh.xh_checksum_1 = XT_CHECKSUM_1(sum);
1704
 
        }
1705
 
        else {
1706
 
                xtWord2 c;
1707
 
                
1708
 
                c = XT_CHECKSUM_2(sum);
1709
 
                XT_SET_DISK_2(log_entry.xu.xu_checksum_2, c);
1710
 
        }
1711
 
#ifdef PRINT_TABLE_MODIFICATIONS
1712
 
        xt_print_log_record(0, 0, &log_entry);
1713
 
#endif
1714
 
        if (xact)
1715
 
                return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, flush_log_at_trx_commit, &xact->xd_begin_log, &xact->xd_begin_offset);
1716
 
 
1717
 
        return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, flush_log_at_trx_commit, NULL, NULL);
1718
 
}
1719
 
 
1720
 
/*
1721
 
 * -----------------------------------------------------------------------
1722
 
 * S E Q U E N T I A L   L O G   R E A  D I N G
1723
 
 */
1724
 
 
1725
 
/*
1726
 
 * Use the log buffer for sequential reading the log.
1727
 
 */
1728
 
xtBool XTDatabaseLog::xlog_seq_init(XTXactSeqReadPtr seq, size_t buffer_size, xtBool load_cache)
1729
 
{
1730
 
        seq->xseq_buffer_size = buffer_size;
1731
 
        seq->xseq_load_cache = load_cache;
1732
 
 
1733
 
        seq->xseq_log_id = 0;
1734
 
        seq->xseq_log_file = NULL;
1735
 
        seq->xseq_log_eof = 0;
1736
 
 
1737
 
        seq->xseq_buf_log_offset = 0;
1738
 
        seq->xseq_buffer_len = 0;
1739
 
        seq->xseq_buffer = (xtWord1 *) xt_malloc_ns(buffer_size);
1740
 
 
1741
 
        seq->xseq_rec_log_id = 0;
1742
 
        seq->xseq_rec_log_offset = 0;
1743
 
        seq->xseq_record_len = 0;
1744
 
 
1745
 
        return seq->xseq_buffer != NULL;
1746
 
}
1747
 
 
1748
 
void XTDatabaseLog::xlog_seq_exit(XTXactSeqReadPtr seq)
1749
 
{
1750
 
        xlog_seq_close(seq);
1751
 
        if (seq->xseq_buffer) {
1752
 
                xt_free_ns(seq->xseq_buffer);
1753
 
                seq->xseq_buffer = NULL;
1754
 
        }
1755
 
}
1756
 
 
1757
 
void XTDatabaseLog::xlog_seq_close(XTXactSeqReadPtr seq)
1758
 
{
1759
 
        if (seq->xseq_log_file) {
1760
 
                xt_close_file_ns(seq->xseq_log_file);
1761
 
                seq->xseq_log_file = NULL;
1762
 
        }
1763
 
        seq->xseq_log_id = 0;
1764
 
        seq->xseq_log_eof = 0;
1765
 
}
1766
 
 
1767
 
xtBool XTDatabaseLog::xlog_seq_start(XTXactSeqReadPtr seq, xtLogID log_id, xtLogOffset log_offset, xtBool XT_UNUSED(missing_ok))
1768
 
{
1769
 
        if (seq->xseq_rec_log_id != log_id) {
1770
 
                seq->xseq_rec_log_id = log_id;
1771
 
                seq->xseq_buf_log_offset = seq->xseq_rec_log_offset;
1772
 
                seq->xseq_buffer_len = 0;
1773
 
        }
1774
 
 
1775
 
        /* Windows version: this will help to switch
1776
 
         * to the new log file.
1777
 
         * Due to reading from the log buffers, this was
1778
 
         * not always done!
1779
 
         */
1780
 
        if (seq->xseq_log_id != log_id) {
1781
 
                if (seq->xseq_log_file) {
1782
 
                        xt_close_file_ns(seq->xseq_log_file);
1783
 
                        seq->xseq_log_file = NULL;
1784
 
                }
1785
 
        }
1786
 
        seq->xseq_rec_log_offset = log_offset;
1787
 
        seq->xseq_record_len = 0;
1788
 
        return OK;
1789
 
}
1790
 
 
1791
 
size_t XTDatabaseLog::xlog_bytes_to_write()
1792
 
{
1793
 
        xtLogID                                 log_id;
1794
 
        xtLogOffset                             log_offset;
1795
 
        xtLogID                                 to_log_id;
1796
 
        xtLogOffset                             to_log_offset;
1797
 
        size_t                                  byte_count = 0;
1798
 
 
1799
 
        log_id = xl_db->db_wr_log_id;
1800
 
        log_offset = xl_db->db_wr_log_offset;
1801
 
        to_log_id = xl_db->db_xlog.xl_flush_log_id;
1802
 
        to_log_offset = xl_db->db_xlog.xl_flush_log_offset;
1803
 
 
1804
 
        /* Assume the logs have the threshold: */
1805
 
        if (log_id < to_log_id) {
1806
 
                if (log_offset < xt_db_log_file_threshold)
1807
 
                        byte_count = (size_t) (xt_db_log_file_threshold - log_offset);
1808
 
                log_offset = 0;
1809
 
                log_id++;
1810
 
        }
1811
 
        while (log_id < to_log_id) {
1812
 
                byte_count += (size_t) xt_db_log_file_threshold;
1813
 
                log_id++;
1814
 
        }
1815
 
        if (log_offset < to_log_offset)
1816
 
                byte_count += (size_t) (to_log_offset - log_offset);
1817
 
 
1818
 
        return byte_count;
1819
 
}
1820
 
 
1821
 
xtBool XTDatabaseLog::xlog_read_from_cache(XTXactSeqReadPtr seq, xtLogID log_id, xtLogOffset log_offset, size_t size, off_t eof, xtWord1 *buffer, size_t *data_read, XTThreadPtr thread)
1822
 
{
1823
 
        /* xseq_log_file could be NULL because xseq_log_id is not set
1824
 
         * to zero when xseq_log_file is set to NULL!
1825
 
         * This bug caused a crash in TeamDrive.
1826
 
         */
1827
 
        if (seq->xseq_log_id != log_id || !seq->xseq_log_file) {
1828
 
                char path[PATH_MAX];
1829
 
 
1830
 
                if (seq->xseq_log_file) {
1831
 
                        xt_close_file_ns(seq->xseq_log_file);
1832
 
                        seq->xseq_log_file = NULL;
1833
 
                }
1834
 
 
1835
 
                xlog_name(PATH_MAX, path, log_id);
1836
 
                if (!xt_open_file_ns(&seq->xseq_log_file, path, XT_FT_STANDARD, XT_FS_MISSING_OK, 16*1024*1024))
1837
 
                        return FAILED;
1838
 
                if (!seq->xseq_log_file) {
1839
 
                        if (data_read)
1840
 
                                *data_read = 0;
1841
 
                        return OK;
1842
 
                }
1843
 
                seq->xseq_log_id = log_id;
1844
 
                seq->xseq_log_eof = 0;
1845
 
        }
1846
 
 
1847
 
        if (!eof) {
1848
 
                if (!seq->xseq_log_eof)
1849
 
                        seq->xseq_log_eof = xt_seek_eof_file(NULL, seq->xseq_log_file);
1850
 
                eof = seq->xseq_log_eof;
1851
 
        }
1852
 
 
1853
 
        if (log_offset >= eof) {
1854
 
                if (data_read)
1855
 
                        *data_read = 0;
1856
 
                return OK;
1857
 
        }
1858
 
 
1859
 
        if ((off_t) size > eof - log_offset)
1860
 
                size = (size_t) (eof - log_offset);
1861
 
 
1862
 
        if (data_read)
1863
 
                *data_read = size;
1864
 
        return xt_xlog_read(seq->xseq_log_file, seq->xseq_log_id, log_offset, size, buffer, seq->xseq_load_cache, thread);
1865
 
}
1866
 
 
1867
 
xtBool XTDatabaseLog::xlog_rnd_read(XTXactSeqReadPtr seq, xtLogID log_id, xtLogOffset log_offset, size_t size, xtWord1 *buffer, size_t *data_read, XTThreadPtr thread)
1868
 
{
1869
 
        /* Fast track to reading from cache: */
1870
 
        if (log_id < xl_write_log_id)
1871
 
                return xlog_read_from_cache(seq, log_id, log_offset, size, 0, buffer, data_read, thread);
1872
 
        
1873
 
        if (log_id == xl_write_log_id && log_offset + (xtLogOffset) size <= xl_write_log_offset)
1874
 
                return xlog_read_from_cache(seq, log_id, log_offset, size, xl_write_log_offset, buffer, data_read, thread);
1875
 
 
1876
 
        /* May be in the log write or append buffer: */
1877
 
        xt_lck_slock(&xl_buffer_lock);
1878
 
 
1879
 
        if (log_id < xl_write_log_id) {
1880
 
                xt_spinlock_unlock(&xl_buffer_lock);
1881
 
                return xlog_read_from_cache(seq, log_id, log_offset, size, 0, buffer, data_read, thread);
1882
 
        }
1883
 
 
1884
 
        /* Check the write buffer: */
1885
 
        if (log_id == xl_write_log_id) {
1886
 
                if (log_offset + (xtLogOffset) size <= xl_write_log_offset) {
1887
 
                        xt_spinlock_unlock(&xl_buffer_lock);
1888
 
                        return xlog_read_from_cache(seq, log_id, log_offset, size, xl_write_log_offset, buffer, data_read, thread);
1889
 
                }
1890
 
 
1891
 
                if (log_offset < xl_write_log_offset + (xtLogOffset) xl_write_buf_pos) {
1892
 
                        /* Reading partially from the write buffer: */
1893
 
                        if (log_offset >= xl_write_log_offset) {
1894
 
                                /* Completely in the buffer. */
1895
 
                                off_t offset = log_offset - xl_write_log_offset;
1896
 
                                
1897
 
                                if (size > xl_write_buf_pos - offset)
1898
 
                                        size = (size_t) (xl_write_buf_pos - offset);
1899
 
                                
1900
 
                                memcpy(buffer, xl_write_buffer + offset, size);
1901
 
                                if (data_read)
1902
 
                                        *data_read = size;
1903
 
                                goto unlock_and_return;
1904
 
                        }
1905
 
 
1906
 
                        /* End part in the buffer: */
1907
 
                        size_t tfer;
1908
 
                        
1909
 
                        /* The amount that will be taken from the cache: */
1910
 
                        tfer = (size_t) (xl_write_log_offset - log_offset);
1911
 
                        
1912
 
                        size -= tfer;
1913
 
                        if (size > xl_write_buf_pos)
1914
 
                                size = xl_write_buf_pos;
1915
 
                        
1916
 
                        memcpy(buffer + tfer, xl_write_buffer, size);
1917
 
 
1918
 
                        xt_spinlock_unlock(&xl_buffer_lock);
1919
 
                        
1920
 
                        /* Read the first part from the cache: */
1921
 
                        if (data_read)
1922
 
                                *data_read = tfer + size;                       
1923
 
                        return xlog_read_from_cache(seq, log_id, log_offset, tfer, log_offset + tfer, buffer, NULL, thread);
1924
 
                }
1925
 
        }
1926
 
 
1927
 
        /* Check the append buffer: */
1928
 
        if (log_id == xl_append_log_id) {
1929
 
                if (log_offset >= xl_append_log_offset && log_offset < xl_append_log_offset + (xtLogOffset) xl_append_buf_pos) {
1930
 
                        /* It is in the append buffer: */
1931
 
                        size_t offset = (size_t) (log_offset - xl_append_log_offset);
1932
 
                        
1933
 
                        if (size > xl_append_buf_pos - offset)
1934
 
                                size = xl_append_buf_pos - offset;
1935
 
                        
1936
 
                        memcpy(buffer, xl_append_buffer + offset, size);
1937
 
                        if (data_read)
1938
 
                                *data_read = size;
1939
 
                        goto unlock_and_return;
1940
 
                }
1941
 
        }
1942
 
 
1943
 
        if (xl_append_log_id == 0) {
1944
 
                /* This catches the case that
1945
 
                 * the log has not yet been initialized
1946
 
                 * for writing.
1947
 
                 */
1948
 
                xt_spinlock_unlock(&xl_buffer_lock);
1949
 
                return xlog_read_from_cache(seq, log_id, log_offset, size, 0, buffer, data_read, thread);
1950
 
        }
1951
 
 
1952
 
        if (data_read)
1953
 
                *data_read = 0;
1954
 
 
1955
 
        unlock_and_return:
1956
 
        xt_spinlock_unlock(&xl_buffer_lock);
1957
 
        return OK;
1958
 
}
1959
 
 
1960
 
xtBool XTDatabaseLog::xlog_write_thru(XTXactSeqReadPtr seq, size_t size, xtWord1 *data, XTThreadPtr thread)
1961
 
{
1962
 
        if (!xt_xlog_write(seq->xseq_log_file, seq->xseq_log_id, seq->xseq_rec_log_offset, size, data, thread))
1963
 
                return FALSE;
1964
 
        xl_log_bytes_written += size;
1965
 
        seq->xseq_rec_log_offset += size;
1966
 
        return TRUE;
1967
 
}
1968
 
 
1969
 
xtBool XTDatabaseLog::xlog_verify(XTXactLogBufferDPtr record, size_t rec_size, xtLogID log_id)
1970
 
{
1971
 
        xtWord4         sum = 0;
1972
 
        xtOpSeqNo       op_seq;
1973
 
        xtTableID       tab_id;
1974
 
        xtRecordID      rec_id, free_rec_id;
1975
 
        int                     check_size = 1;
1976
 
        xtWord1         *dptr;
1977
 
        xtWord4         g;
1978
 
 
1979
 
        switch (record->xh.xh_status_1) {
1980
 
                case XT_LOG_ENT_HEADER:
1981
 
                        if (record->xh.xh_checksum_1 != XT_CHECKSUM_1(log_id))
1982
 
                                return FALSE;
1983
 
                        if (XT_LOG_HEAD_MAGIC(record, rec_size) != XT_LOG_FILE_MAGIC)
1984
 
                                return FALSE;
1985
 
                        if (rec_size >= offsetof(XTXactLogHeaderDRec, xh_log_id_4) + 4) {
1986
 
                                if (XT_GET_DISK_4(record->xh.xh_log_id_4) != log_id)
1987
 
                                        return FALSE;
1988
 
                        }
1989
 
                        return TRUE;
1990
 
                case XT_LOG_ENT_NEW_LOG:
1991
 
                case XT_LOG_ENT_DEL_LOG:
1992
 
                        return record->xl.xl_checksum_1 == (XT_CHECKSUM_1(XT_GET_DISK_4(record->xl.xl_log_id_4)) ^ XT_CHECKSUM_1(log_id));
1993
 
                case XT_LOG_ENT_NEW_TAB:
1994
 
                        return record->xl.xl_checksum_1 == (XT_CHECKSUM_1(XT_GET_DISK_4(record->xt.xt_tab_id_4)) ^ XT_CHECKSUM_1(log_id));
1995
 
                case XT_LOG_ENT_COMMIT:
1996
 
                case XT_LOG_ENT_ABORT:
1997
 
                        sum = XT_CHECKSUM4_XACT(XT_GET_DISK_4(record->xe.xe_xact_id_4)) ^ XT_CHECKSUM4_XACT(XT_GET_DISK_4(record->xe.xe_not_used_4));
1998
 
                        return record->xe.xe_checksum_1 == (XT_CHECKSUM_1(sum) ^ XT_CHECKSUM_1(log_id));
1999
 
                case XT_LOG_ENT_CLEANUP:
2000
 
                        sum = XT_CHECKSUM4_XACT(XT_GET_DISK_4(record->xc.xc_xact_id_4));
2001
 
                        return record->xc.xc_checksum_1 == (XT_CHECKSUM_1(sum) ^ XT_CHECKSUM_1(log_id));
2002
 
                case XT_LOG_ENT_REC_MODIFIED:
2003
 
                case XT_LOG_ENT_UPDATE:
2004
 
                case XT_LOG_ENT_INSERT:
2005
 
                case XT_LOG_ENT_DELETE:
2006
 
                case XT_LOG_ENT_UPDATE_BG:
2007
 
                case XT_LOG_ENT_INSERT_BG:
2008
 
                case XT_LOG_ENT_DELETE_BG:
2009
 
                        check_size = 2;
2010
 
                        op_seq = XT_GET_DISK_4(record->xu.xu_op_seq_4);
2011
 
                        tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
2012
 
                        rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
2013
 
                        dptr = &record->xu.xu_rec_type_1;
2014
 
                        rec_size -= offsetof(XTactUpdateEntryDRec, xu_rec_type_1);
2015
 
                        break;
2016
 
                case XT_LOG_ENT_UPDATE_FL:
2017
 
                case XT_LOG_ENT_INSERT_FL:
2018
 
                case XT_LOG_ENT_DELETE_FL:
2019
 
                case XT_LOG_ENT_UPDATE_FL_BG:
2020
 
                case XT_LOG_ENT_INSERT_FL_BG:
2021
 
                case XT_LOG_ENT_DELETE_FL_BG:
2022
 
                        check_size = 2;
2023
 
                        op_seq = XT_GET_DISK_4(record->xf.xf_op_seq_4);
2024
 
                        tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
2025
 
                        rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
2026
 
                        free_rec_id = XT_GET_DISK_4(record->xf.xf_free_rec_id_4);
2027
 
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
2028
 
                        dptr = &record->xf.xf_rec_type_1;
2029
 
                        rec_size -= offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1);
2030
 
                        break;
2031
 
                case XT_DEFUNKT_REC_FREED:
2032
 
                case XT_DEFUNKT_REC_REMOVED:
2033
 
                case XT_DEFUNKT_REC_REMOVED_EXT:
2034
 
                        op_seq = XT_GET_DISK_4(record->fr.fr_op_seq_4);
2035
 
                        tab_id = XT_GET_DISK_4(record->fr.fr_tab_id_4);
2036
 
                        rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
2037
 
                        dptr = &record->fr.fr_stat_id_1;
2038
 
                        rec_size -= offsetof(XTactFreeRecEntryDRec, fr_stat_id_1);
2039
 
                        break;
2040
 
                case XT_LOG_ENT_REC_REMOVED_BI:
2041
 
                        check_size = 2;
2042
 
                        op_seq = XT_GET_DISK_4(record->rb.rb_op_seq_4);
2043
 
                        tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
2044
 
                        rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
2045
 
                        free_rec_id = (xtWord4) record->rb.rb_new_rec_type_1;
2046
 
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
2047
 
                        dptr = &record->rb.rb_rec_type_1;
2048
 
                        rec_size -= offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1);
2049
 
                        break;
2050
 
                case XT_LOG_ENT_REC_REMOVED_BI_L:
2051
 
                        check_size = 2;
2052
 
                        op_seq = XT_GET_DISK_4(record->bl.bl_op_seq_4);
2053
 
                        tab_id = XT_GET_DISK_4(record->bl.bl_tab_id_4);
2054
 
                        rec_id = XT_GET_DISK_4(record->bl.bl_rec_id_4);
2055
 
                        free_rec_id = XT_GET_DISK_4(record->bl.bl_prev_rec_id_4);
2056
 
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
2057
 
                        free_rec_id = (xtWord4) record->bl.bl_new_rec_type_1;
2058
 
                        sum ^= XT_CHECKSUM4_REC(free_rec_id);
2059
 
                        dptr = &record->bl.bl_rec_type_1;
2060
 
                        rec_size -= offsetof(XTactRemoveBILEntryDRec, bl_rec_type_1);
2061
 
                        break;
2062
 
                case XT_LOG_ENT_REC_MOVED:
2063
 
                case XT_LOG_ENT_REC_CLEANED:
2064
 
                case XT_LOG_ENT_REC_CLEANED_1:
2065
 
                case XT_LOG_ENT_REC_UNLINKED:
2066
 
                        op_seq = XT_GET_DISK_4(record->xw.xw_op_seq_4);
2067
 
                        tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
2068
 
                        rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
2069
 
                        dptr = &record->xw.xw_rec_type_1;
2070
 
                        rec_size -= offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
2071
 
                        break;
2072
 
                case XT_LOG_ENT_ROW_NEW:
2073
 
                case XT_LOG_ENT_ROW_NEW_FL:
2074
 
                        op_seq = XT_GET_DISK_4(record->xa.xa_op_seq_4);
2075
 
                        tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
2076
 
                        rec_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
2077
 
                        if (record->xh.xh_status_1 == XT_LOG_ENT_ROW_NEW) {
2078
 
                                dptr = (xtWord1 *) record + offsetof(XTactRowAddedEntryDRec, xa_free_list_4);
2079
 
                                rec_size -= offsetof(XTactRowAddedEntryDRec, xa_free_list_4);
2080
 
                        }
2081
 
                        else {
2082
 
                                free_rec_id = XT_GET_DISK_4(record->xa.xa_free_list_4);
2083
 
                                sum ^= XT_CHECKSUM4_REC(free_rec_id);
2084
 
                                dptr = (xtWord1 *) record + sizeof(XTactRowAddedEntryDRec);
2085
 
                                rec_size -= sizeof(XTactRowAddedEntryDRec);
2086
 
                        }
2087
 
                        break;
2088
 
                case XT_LOG_ENT_ROW_ADD_REC:
2089
 
                case XT_LOG_ENT_ROW_SET:
2090
 
                case XT_LOG_ENT_ROW_FREED:
2091
 
                        op_seq = XT_GET_DISK_4(record->wr.wr_op_seq_4);
2092
 
                        tab_id = XT_GET_DISK_4(record->wr.wr_tab_id_4);
2093
 
                        rec_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
2094
 
                        dptr = (xtWord1 *) &record->wr.wr_ref_id_4;
2095
 
                        rec_size -= offsetof(XTactWriteRowEntryDRec, wr_ref_id_4);
2096
 
                        break;
2097
 
                case XT_LOG_ENT_OP_SYNC:
2098
 
                        return record->xl.xl_checksum_1 == (XT_CHECKSUM_1(XT_GET_DISK_4(record->os.os_time_4)) ^ XT_CHECKSUM_1(log_id));
2099
 
                case XT_LOG_ENT_NO_OP:
2100
 
                        sum = XT_GET_DISK_4(record->no.no_tab_id_4) ^ XT_GET_DISK_4(record->no.no_op_seq_4);
2101
 
                        return record->xe.xe_checksum_1 == (XT_CHECKSUM_1(sum) ^ XT_CHECKSUM_1(log_id));
2102
 
                case XT_LOG_ENT_END_OF_LOG:
2103
 
                        return FALSE;
2104
 
                case XT_LOG_ENT_PREPARE:
2105
 
                        check_size = 2;
2106
 
                        op_seq = XT_GET_DISK_4(record->xp.xp_xact_id_4);
2107
 
                        tab_id = 0;
2108
 
                        rec_id = 0;
2109
 
                        dptr = record->xp.xp_xa_data;
2110
 
                        rec_size -= offsetof(XTXactPrepareEntryDRec, xp_xa_data);
2111
 
                        break;
2112
 
                default:
2113
 
                        ASSERT_NS(FALSE);
2114
 
                        return FALSE;
2115
 
        }
2116
 
 
2117
 
        sum ^= (xtWord4) op_seq ^ ((xtWord4) tab_id << 8) ^ XT_CHECKSUM4_REC(rec_id);
2118
 
 
2119
 
        if ((g = sum & 0xF0000000)) {
2120
 
                sum = sum ^ (g >> 24);
2121
 
                sum = sum ^ g;
2122
 
        }
2123
 
        for (u_int i=0; i<(u_int) rec_size; i++) {
2124
 
                sum = (sum << 4) + *dptr;
2125
 
                if ((g = sum & 0xF0000000)) {
2126
 
                        sum = sum ^ (g >> 24);
2127
 
                        sum = sum ^ g;
2128
 
                }
2129
 
                dptr++;
2130
 
        }
2131
 
 
2132
 
        if (check_size == 1) {
2133
 
                if (record->xh.xh_checksum_1 != (XT_CHECKSUM_1(sum) ^ XT_CHECKSUM_1(log_id))) {
2134
 
                        return FAILED;
2135
 
                }
2136
 
        }
2137
 
        else {
2138
 
                if (XT_GET_DISK_2(record->xu.xu_checksum_2) != (XT_CHECKSUM_2(sum) ^ XT_CHECKSUM_2(log_id))) {
2139
 
                        return FAILED;
2140
 
                }
2141
 
        }
2142
 
        return TRUE;
2143
 
}
2144
 
 
2145
 
xtBool XTDatabaseLog::xlog_seq_next(XTXactSeqReadPtr seq, XTXactLogBufferDPtr *ret_entry, xtBool verify, XTThreadPtr thread)
2146
 
{
2147
 
        XTXactLogBufferDPtr     record;
2148
 
        size_t                          tfer;
2149
 
        size_t                          len;
2150
 
        size_t                          rec_offset;
2151
 
        size_t                          max_rec_len;
2152
 
        size_t                          size;
2153
 
        u_int                           check_size = 1;
2154
 
 
2155
 
        /* Go to the next record (xseq_record_len must be initialized
2156
 
         * to 0 for this to work.
2157
 
         */
2158
 
        seq->xseq_rec_log_offset += seq->xseq_record_len;
2159
 
        seq->xseq_record_len = 0;
2160
 
 
2161
 
        if (seq->xseq_rec_log_offset < seq->xseq_buf_log_offset ||
2162
 
                seq->xseq_rec_log_offset >= seq->xseq_buf_log_offset + (xtLogOffset) seq->xseq_buffer_len) {
2163
 
                /* The current position is nowhere near the buffer, read data into the
2164
 
                 * buffer:
2165
 
                 */
2166
 
                tfer = seq->xseq_buffer_size;
2167
 
                if (!xlog_rnd_read(seq, seq->xseq_rec_log_id, seq->xseq_rec_log_offset, tfer, seq->xseq_buffer, &tfer, thread))
2168
 
                        return FAILED;
2169
 
                seq->xseq_buf_log_offset = seq->xseq_rec_log_offset;
2170
 
                seq->xseq_buffer_len = tfer;
2171
 
 
2172
 
                /* Should we go to the next log? */
2173
 
                if (!tfer) {
2174
 
                        goto return_empty;
2175
 
                }
2176
 
        }
2177
 
 
2178
 
        /* The start of the record is in the buffer: */
2179
 
        read_from_buffer:
2180
 
        rec_offset = (size_t) (seq->xseq_rec_log_offset - seq->xseq_buf_log_offset);
2181
 
        max_rec_len = seq->xseq_buffer_len - rec_offset;
2182
 
        size = 0;
2183
 
 
2184
 
        /* Check the type of record: */
2185
 
        record = (XTXactLogBufferDPtr) (seq->xseq_buffer + rec_offset);
2186
 
        switch (record->xh.xh_status_1) {
2187
 
                case XT_LOG_ENT_HEADER:
2188
 
                        len = offsetof(XTXactLogHeaderDRec, xh_version_2) + 2;
2189
 
                        if (len > max_rec_len)
2190
 
                                /* The size is not in the buffer: */
2191
 
                                goto read_more;
2192
 
                        len = (size_t) XT_GET_DISK_4(record->xh.xh_size_4);
2193
 
                        break;
2194
 
                case XT_LOG_ENT_NEW_LOG:
2195
 
                case XT_LOG_ENT_DEL_LOG:
2196
 
                        len = sizeof(XTXactNewLogEntryDRec);
2197
 
                        break;
2198
 
                case XT_LOG_ENT_NEW_TAB:
2199
 
                        len = sizeof(XTXactNewTabEntryDRec);
2200
 
                        break;
2201
 
                case XT_LOG_ENT_COMMIT:
2202
 
                case XT_LOG_ENT_ABORT:
2203
 
                        len = sizeof(XTXactEndEntryDRec);
2204
 
                        break;
2205
 
                case XT_LOG_ENT_CLEANUP:
2206
 
                        len = sizeof(XTXactCleanupEntryDRec);
2207
 
                        break;
2208
 
                case XT_LOG_ENT_REC_MODIFIED:
2209
 
                case XT_LOG_ENT_UPDATE:
2210
 
                case XT_LOG_ENT_INSERT:
2211
 
                case XT_LOG_ENT_DELETE:
2212
 
                case XT_LOG_ENT_UPDATE_BG:
2213
 
                case XT_LOG_ENT_INSERT_BG:
2214
 
                case XT_LOG_ENT_DELETE_BG:
2215
 
                        check_size = 2;
2216
 
                        len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1);
2217
 
                        if (len > max_rec_len)
2218
 
                                /* The size is not in the buffer: */
2219
 
                                goto read_more;
2220
 
                        len += (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
2221
 
                        break;
2222
 
                case XT_LOG_ENT_UPDATE_FL:
2223
 
                case XT_LOG_ENT_INSERT_FL:
2224
 
                case XT_LOG_ENT_DELETE_FL:
2225
 
                case XT_LOG_ENT_UPDATE_FL_BG:
2226
 
                case XT_LOG_ENT_INSERT_FL_BG:
2227
 
                case XT_LOG_ENT_DELETE_FL_BG:
2228
 
                        check_size = 2;
2229
 
                        len = offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1);
2230
 
                        if (len > max_rec_len)
2231
 
                                /* The size is not in the buffer: */
2232
 
                                goto read_more;
2233
 
                        len += (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
2234
 
                        break;
2235
 
                case XT_DEFUNKT_REC_FREED:
2236
 
                case XT_DEFUNKT_REC_REMOVED:
2237
 
                case XT_DEFUNKT_REC_REMOVED_EXT:
2238
 
                        /* [(7)] REMOVE is now a extended version of FREE! */
2239
 
                        len = offsetof(XTactFreeRecEntryDRec, fr_rec_type_1) + sizeof(XTTabRecFreeDRec);
2240
 
                        break;
2241
 
                case XT_LOG_ENT_REC_REMOVED_BI:
2242
 
                        check_size = 2;
2243
 
                        len = offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1);
2244
 
                        if (len > max_rec_len)
2245
 
                                /* The size is not in the buffer: */
2246
 
                                goto read_more;
2247
 
                        len += (size_t) XT_GET_DISK_2(record->rb.rb_size_2);
2248
 
                        break;
2249
 
                case XT_LOG_ENT_REC_REMOVED_BI_L:
2250
 
                        check_size = 2;
2251
 
                        len = offsetof(XTactRemoveBILEntryDRec, bl_rec_type_1);
2252
 
                        if (len > max_rec_len)
2253
 
                                /* The size is not in the buffer: */
2254
 
                                goto read_more;
2255
 
                        len += (size_t) XT_GET_DISK_2(record->bl.bl_size_2);
2256
 
                        break;
2257
 
                case XT_LOG_ENT_REC_MOVED:
2258
 
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 8;
2259
 
                        break;
2260
 
                case XT_LOG_ENT_REC_CLEANED:
2261
 
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
2262
 
                        break;
2263
 
                case XT_LOG_ENT_REC_CLEANED_1:
2264
 
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 1;
2265
 
                        break;
2266
 
                case XT_LOG_ENT_REC_UNLINKED:
2267
 
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
2268
 
                        break;
2269
 
                case XT_LOG_ENT_ROW_NEW:
2270
 
                        len = offsetof(XTactRowAddedEntryDRec, xa_row_id_4) + XT_ROW_ID_SIZE;
2271
 
                        break;
2272
 
                case XT_LOG_ENT_ROW_NEW_FL:
2273
 
                        len = offsetof(XTactRowAddedEntryDRec, xa_free_list_4) + XT_ROW_ID_SIZE;
2274
 
                        break;
2275
 
                case XT_LOG_ENT_ROW_ADD_REC:
2276
 
                case XT_LOG_ENT_ROW_SET:
2277
 
                case XT_LOG_ENT_ROW_FREED:
2278
 
                        len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4) + XT_REF_ID_SIZE;
2279
 
                        break;
2280
 
                case XT_LOG_ENT_OP_SYNC:
2281
 
                        len = sizeof(XTactOpSyncEntryDRec);
2282
 
                        break;
2283
 
                case XT_LOG_ENT_NO_OP:
2284
 
                        len = sizeof(XTactNoOpEntryDRec);
2285
 
                        break;
2286
 
                case XT_LOG_ENT_END_OF_LOG: {
2287
 
                        off_t eof = seq->xseq_log_eof, adjust;
2288
 
                        
2289
 
                        if (eof > seq->xseq_rec_log_offset) {
2290
 
                                adjust = eof - seq->xseq_rec_log_offset;
2291
 
 
2292
 
                                seq->xseq_record_len = (size_t) adjust;
2293
 
                        }
2294
 
                        goto return_empty;
2295
 
                }
2296
 
                case XT_LOG_ENT_PREPARE:
2297
 
                        check_size = 2;
2298
 
                        len = offsetof(XTXactPrepareEntryDRec, xp_xa_data);
2299
 
                        if (len > max_rec_len)
2300
 
                                /* The size is not in the buffer: */
2301
 
                                goto read_more;
2302
 
                        len += (size_t) record->xp.xp_xa_len_1;
2303
 
                        break;
2304
 
                default:
2305
 
                        /* It is possible to land here after a crash, if the
2306
 
                         * log was not completely written.
2307
 
                         */
2308
 
                        seq->xseq_record_len = 0;
2309
 
                        goto return_empty;
2310
 
        }
2311
 
 
2312
 
        ASSERT_NS(len <= seq->xseq_buffer_size);
2313
 
        if (len <= max_rec_len) {
2314
 
                if (verify) {
2315
 
                        if (!xlog_verify(record, len, seq->xseq_rec_log_id)) {
2316
 
                                goto return_empty;
2317
 
                        }
2318
 
                }
2319
 
 
2320
 
                /* The record is completely in the buffer: */
2321
 
                seq->xseq_record_len = len;
2322
 
                *ret_entry = record;
2323
 
                return OK;
2324
 
        }
2325
 
        
2326
 
        /* The record is partially in the buffer. */
2327
 
        memmove(seq->xseq_buffer, seq->xseq_buffer + rec_offset, max_rec_len);
2328
 
        seq->xseq_buf_log_offset += rec_offset;
2329
 
        seq->xseq_buffer_len = max_rec_len;
2330
 
 
2331
 
        /* Read the rest, as far as possible: */
2332
 
        tfer = seq->xseq_buffer_size - max_rec_len;
2333
 
        if (!xlog_rnd_read(seq, seq->xseq_rec_log_id, seq->xseq_buf_log_offset + max_rec_len, tfer, seq->xseq_buffer + max_rec_len, &tfer, thread))
2334
 
                return FAILED;
2335
 
        seq->xseq_buffer_len += tfer;
2336
 
 
2337
 
        if (seq->xseq_buffer_len < len) {
2338
 
                /* A partial record is in the log, must be the end of the log: */
2339
 
                goto return_empty;
2340
 
        }
2341
 
 
2342
 
        /* The record is now completely in the buffer: */
2343
 
        seq->xseq_record_len = len;
2344
 
        *ret_entry = (XTXactLogBufferDPtr) seq->xseq_buffer;
2345
 
        return OK;
2346
 
 
2347
 
        read_more:
2348
 
        ASSERT_NS(len <= seq->xseq_buffer_size);
2349
 
        memmove(seq->xseq_buffer, seq->xseq_buffer + rec_offset, max_rec_len);
2350
 
        seq->xseq_buf_log_offset += rec_offset;
2351
 
        seq->xseq_buffer_len = max_rec_len;
2352
 
 
2353
 
        /* Read the rest, as far as possible: */
2354
 
        tfer = seq->xseq_buffer_size - max_rec_len;
2355
 
        if (!xlog_rnd_read(seq, seq->xseq_rec_log_id, seq->xseq_buf_log_offset + max_rec_len, tfer, seq->xseq_buffer + max_rec_len, &tfer, thread))
2356
 
                return FAILED;
2357
 
        seq->xseq_buffer_len += tfer;
2358
 
 
2359
 
        if (seq->xseq_buffer_len < len + size) {
2360
 
                /* We did not get as much as we need, return an empty record: */
2361
 
                goto return_empty;
2362
 
        }
2363
 
 
2364
 
        goto read_from_buffer;
2365
 
 
2366
 
        return_empty:
2367
 
        *ret_entry = NULL;
2368
 
        return OK;
2369
 
}
2370
 
 
2371
 
void XTDatabaseLog::xlog_seq_skip(XTXactSeqReadPtr seq, size_t size)
2372
 
{
2373
 
        seq->xseq_record_len += size;
2374
 
}
2375
 
 
2376
 
/* ----------------------------------------------------------------------
2377
 
 * W R I T E R    P R O C E S S
2378
 
 */
2379
 
 
2380
 
/*
2381
 
 * The log has been written. Wake the writer to commit the
2382
 
 * data to disk, if the transaction log cache is full.
2383
 
 *
2384
 
 * Data may not be written to the database until it has been
2385
 
 * flushed to the log.
2386
 
 *
2387
 
 * This is because there is no way to undo changes to the
2388
 
 * database.
2389
 
 *
2390
 
 * However, I have dicovered that writing constantly in the
2391
 
 * background can disturb the I/O in the foreground.
2392
 
 *
2393
 
 * So we can delay the writing of the database. But we should
2394
 
 * not delay it longer than we have transaction log cache.
2395
 
 *
2396
 
 * If so, the data that we need will fall out of the cache
2397
 
 * and we will have to read it again.
2398
 
 */
2399
 
static void xlog_wr_log_written(XTDatabaseHPtr db)
2400
 
{
2401
 
        if (db->db_wr_idle) {
2402
 
                xtWord8 cached_bytes;
2403
 
 
2404
 
                /* Determine if the cached log data is about to fall out of the cache. */
2405
 
                cached_bytes = db->db_xlog.xl_log_bytes_written - db->db_xlog.xl_log_bytes_read;
2406
 
                /* The limit is 75%: */
2407
 
                if (cached_bytes >= xt_xlog_cache.xlc_upper_limit) {
2408
 
                        if (!xt_broadcast_cond_ns(&db->db_wr_cond))
2409
 
                                xt_log_and_clear_exception_ns();
2410
 
                }
2411
 
        }
2412
 
}
2413
 
 
2414
 
#define XT_MORE_TO_WRITE                1
2415
 
#define XT_FREER_WAITING                2
2416
 
#define XT_NO_ACTIVITY                  3
2417
 
#define XT_LOG_CACHE_FULL               4
2418
 
#define XT_CHECKPOINT_REQ               5
2419
 
#define XT_THREAD_WAITING               6
2420
 
#define XT_TIME_TO_WRITE                7
2421
 
 
2422
 
/*
2423
 
 * Wait for a reason to write the data from the log to the database.
2424
 
 * This can be one of the following:
2425
 
 *
2426
 
 */
2427
 
static int xlog_wr_wait_for_write_condition(XTThreadPtr self, XTDatabaseHPtr db, int old_reason)
2428
 
{
2429
 
        xtXactID        last_xn_id;
2430
 
        xtWord8         cached_bytes;
2431
 
        int                     reason = XT_MORE_TO_WRITE;
2432
 
 
2433
 
#ifdef TRACE_WRITER_ACTIVITY
2434
 
        printf("WRITER --- DONE\n");
2435
 
#endif
2436
 
 
2437
 
        xt_lock_mutex(self, &db->db_wr_lock);
2438
 
        pushr_(xt_unlock_mutex, &db->db_wr_lock);
2439
 
 
2440
 
        /*
2441
 
         * Wake the freeer if it is waiting for this writer, before
2442
 
         * we go to sleep!
2443
 
         */
2444
 
        if (db->db_wr_freeer_waiting) {
2445
 
                if (!xt_broadcast_cond_ns(&db->db_wr_cond))
2446
 
                        xt_log_and_clear_exception_ns();
2447
 
        }
2448
 
 
2449
 
        if (db->db_wr_flush_point_log_id == db->db_xlog.xl_flush_log_id &&
2450
 
                db->db_wr_flush_point_log_offset == db->db_xlog.xl_flush_log_offset) {
2451
 
                /* Wake the checkpointer to flush the indexes:
2452
 
                 * PMC 15.05.2008 - Not doing this anymore!
2453
 
                 *
2454
 
                 * PMC 1.07.2009 - Added this notification again, if the
2455
 
                 * reason why we wrote is because the checkpointer may have
2456
 
                 * been waiting:
2457
 
                 */
2458
 
                if (old_reason == XT_CHECKPOINT_REQ)
2459
 
                        xt_wake_checkpointer(self, db);
2460
 
 
2461
 
                /* Sleep as long as the flush point has not changed, from the last
2462
 
                 * target flush point.
2463
 
                 */
2464
 
                while (!self->t_quit &&
2465
 
                        db->db_wr_flush_point_log_id == db->db_xlog.xl_flush_log_id &&
2466
 
                        db->db_wr_flush_point_log_offset == db->db_xlog.xl_flush_log_offset &&
2467
 
                        reason != XT_LOG_CACHE_FULL &&
2468
 
                        reason != XT_TIME_TO_WRITE &&
2469
 
                        reason != XT_CHECKPOINT_REQ) {
2470
 
 
2471
 
                        /*
2472
 
                         * Sleep as long as there is no reason to write any more...
2473
 
                         */
2474
 
                        while (!self->t_quit) {
2475
 
                                last_xn_id = db->db_xn_curr_id;
2476
 
                                db->db_wr_idle = XT_THREAD_IDLE;
2477
 
                                xt_timed_wait_cond(self, &db->db_wr_cond, &db->db_wr_lock, 500);
2478
 
                                db->db_wr_idle = XT_THREAD_BUSY;
2479
 
                                /* These are the reasons for doing work: */
2480
 
                                /* The free'er thread is waiting for the writer: */
2481
 
                                if (db->db_wr_freeer_waiting) {
2482
 
                                        reason = XT_FREER_WAITING;
2483
 
                                        break;
2484
 
                                }
2485
 
                                /* Some thread is waiting for the writer: */
2486
 
                                if (db->db_wr_thread_waiting) {
2487
 
                                        reason = XT_THREAD_WAITING;
2488
 
                                        break;
2489
 
                                }
2490
 
                                /* Check if the cache will soon overflow... */
2491
 
                                ASSERT(db->db_xlog.xl_log_bytes_written >= db->db_xlog.xl_log_bytes_read);
2492
 
                                ASSERT(db->db_xlog.xl_log_bytes_written >= db->db_xlog.xl_log_bytes_flushed);
2493
 
                                /* Sanity check: */
2494
 
                                ASSERT(db->db_xlog.xl_log_bytes_written < db->db_xlog.xl_log_bytes_read + 500000000);
2495
 
                                /* This is the amount of data still to be written: */
2496
 
                                cached_bytes = db->db_xlog.xl_log_bytes_written - db->db_xlog.xl_log_bytes_read;
2497
 
                                /* The limit is 75%: */
2498
 
                                if (cached_bytes >= xt_xlog_cache.xlc_upper_limit) {
2499
 
                                        reason = XT_LOG_CACHE_FULL;
2500
 
                                        break;
2501
 
                                }
2502
 
                                
2503
 
                                /* Create a system variable which specifies the write frequency. */
2504
 
                                if (xt_db_record_write_threshold && cached_bytes >= xt_db_record_write_threshold) {
2505
 
                                        reason = XT_TIME_TO_WRITE;
2506
 
                                        break;
2507
 
                                }
2508
 
                                
2509
 
                                /* Check if we are holding up a checkpoint: */
2510
 
                                if (db->db_restart.xres_cp_required ||
2511
 
                                        db->db_restart.xres_is_checkpoint_pending(db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset)) {
2512
 
                                        /* Enough data has been written for a checkpoint: */
2513
 
                                        if (!db->db_restart.xres_is_checkpoint_pending(db->db_wr_log_id, db->db_wr_log_offset)) {
2514
 
                                                /* But not enough data has been written for a checkpoint: */
2515
 
                                                reason = XT_CHECKPOINT_REQ;
2516
 
                                                break;
2517
 
                                        }
2518
 
                                }
2519
 
                                /* There is no activity, if the current ID has not changed during
2520
 
                                 * the wait, and the sweeper has nothing to do, and the checkpointer.
2521
 
                                 */
2522
 
                                if (db->db_xn_curr_id == last_xn_id &&
2523
 
                                        /* Changed xt_xn_get_curr_id(db) to db->db_xn_curr_id,
2524
 
                                         * This should work because we are not concerned about the difference
2525
 
                                         * between xt_xn_get_curr_id(db) and db->db_xn_curr_id,
2526
 
                                         * Which is just a matter of when transactions we can expect ot find
2527
 
                                         * in memory (see {GAP-INC-ADD-XACT})
2528
 
                                         */
2529
 
                                        xt_xn_is_before(db->db_xn_curr_id, db->db_xn_to_clean_id) && // db->db_xn_curr_id < db->db_xn_to_clean_id
2530
 
                                        !db->db_restart.xres_is_checkpoint_pending(db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset)) {
2531
 
                                        /* There seems to be no activity at the moment.
2532
 
                                         * this might be a good time to write the log data.
2533
 
                                         */
2534
 
                                        reason = XT_NO_ACTIVITY;
2535
 
                                        break;
2536
 
                                }
2537
 
                        }
2538
 
                }
2539
 
        }
2540
 
        freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2541
 
 
2542
 
        if (reason == XT_LOG_CACHE_FULL || reason == XT_TIME_TO_WRITE || reason == XT_CHECKPOINT_REQ) {
2543
 
                /* Make sure that we have something to write: */
2544
 
                if (db->db_xlog.xlog_bytes_to_write() < 2 * 1204 * 1024)
2545
 
                        xt_xlog_flush_log(db, self);
2546
 
        }
2547
 
 
2548
 
#ifdef TRACE_WRITER_ACTIVITY
2549
 
        switch (reason) {
2550
 
                case XT_MORE_TO_WRITE:  printf("WRITER --- still more to write...\n"); break;
2551
 
                case XT_FREER_WAITING:  printf("WRITER --- free'er waiting for writer...\n"); break;
2552
 
                case XT_NO_ACTIVITY:    printf("WRITER --- no activity...\n"); break;
2553
 
                case XT_LOG_CACHE_FULL: printf("WRITER --- running out of log cache...\n"); break;
2554
 
                case XT_CHECKPOINT_REQ: printf("WRITER --- enough flushed for a checkpoint...\n"); break;
2555
 
                case XT_THREAD_WAITING: printf("WRITER --- thread waiting for writer...\n"); break;
2556
 
                case XT_TIME_TO_WRITE:  printf("WRITER --- limit of 12MB reached, time to write...\n"); break;
2557
 
        }
2558
 
#endif
2559
 
        
2560
 
        return reason;
2561
 
}
2562
 
 
2563
 
static void xlog_wr_could_go_faster(XTThreadPtr self, XTDatabaseHPtr db)
2564
 
{
2565
 
        if (db->db_wr_faster) {
2566
 
                if (!db->db_wr_fast) {
2567
 
                        xt_set_normal_priority(self);
2568
 
                        db->db_wr_fast = TRUE;
2569
 
                }
2570
 
                db->db_wr_faster = FALSE;
2571
 
        }
2572
 
}
2573
 
 
2574
 
static void xlog_wr_could_go_slower(XTThreadPtr self, XTDatabaseHPtr db)
2575
 
{
2576
 
        if (db->db_wr_fast && !db->db_wr_faster) {
2577
 
                xt_set_low_priority(self);
2578
 
                db->db_wr_fast = FALSE;
2579
 
        }
2580
 
}
2581
 
 
2582
 
static void xlog_wr_main(XTThreadPtr self)
2583
 
{
2584
 
        XTDatabaseHPtr          db = self->st_database;
2585
 
        XTWriterStatePtr        ws;
2586
 
        XTXactLogBufferDPtr     record;
2587
 
        int                                     reason = XT_NO_ACTIVITY;
2588
 
 
2589
 
        xt_set_low_priority(self);
2590
 
 
2591
 
        alloczr_(ws, xt_free_writer_state, sizeof(XTWriterStateRec), XTWriterStatePtr);
2592
 
        ws->ws_db = db;
2593
 
        ws->ws_in_recover = FALSE;
2594
 
 
2595
 
        if (!db->db_xlog.xlog_seq_init(&ws->ws_seqread, xt_db_log_buffer_size, FALSE))
2596
 
                xt_throw(self);
2597
 
 
2598
 
        if (!db->db_xlog.xlog_seq_start(&ws->ws_seqread, db->db_wr_log_id, db->db_wr_log_offset, FALSE))
2599
 
                xt_throw(self);
2600
 
 
2601
 
        while (!self->t_quit) {
2602
 
                while (!self->t_quit) {
2603
 
                        /* Determine the point to which we can write.
2604
 
                         * This is the current log flush point!
2605
 
                         */
2606
 
                        xt_lock_mutex_ns(&db->db_wr_lock);
2607
 
                        db->db_wr_flush_point_log_id = db->db_xlog.xl_flush_log_id;
2608
 
                        db->db_wr_flush_point_log_offset = db->db_xlog.xl_flush_log_offset;
2609
 
                        xt_unlock_mutex_ns(&db->db_wr_lock);
2610
 
 
2611
 
                        if (xt_comp_log_pos(db->db_wr_log_id, db->db_wr_log_offset, db->db_wr_flush_point_log_id, db->db_wr_flush_point_log_offset) >= 0) {
2612
 
                                break;
2613
 
                        }
2614
 
 
2615
 
                        while (!self->t_quit) {
2616
 
                                xlog_wr_could_go_faster(self, db);
2617
 
 
2618
 
                                /* This is the restart position: */
2619
 
                                xt_lock_mutex(self, &db->db_wr_lock);
2620
 
                                pushr_(xt_unlock_mutex, &db->db_wr_lock);
2621
 
                                db->db_wr_log_id = ws->ws_seqread.xseq_rec_log_id;
2622
 
                                db->db_wr_log_offset = ws->ws_seqread.xseq_rec_log_offset +  ws->ws_seqread.xseq_record_len;
2623
 
                                freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2624
 
 
2625
 
                                if (xt_comp_log_pos(db->db_wr_log_id, db->db_wr_log_offset, db->db_wr_flush_point_log_id, db->db_wr_flush_point_log_offset) >= 0) {
2626
 
                                        break;
2627
 
                                }
2628
 
 
2629
 
                                /* Apply all changes that have been flushed to the log, to the
2630
 
                                 * database.
2631
 
                                 */
2632
 
                                if (!db->db_xlog.xlog_seq_next(&ws->ws_seqread, &record, FALSE, self))
2633
 
                                        xt_throw(self);
2634
 
                                if (!record) {
2635
 
                                        break;
2636
 
                                }
2637
 
                                switch (record->xl.xl_status_1) {
2638
 
                                        case XT_LOG_ENT_HEADER:
2639
 
                                                break;
2640
 
                                        case XT_LOG_ENT_NEW_LOG:
2641
 
                                                if (!db->db_xlog.xlog_seq_start(&ws->ws_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, TRUE))
2642
 
                                                        xt_throw(self);
2643
 
                                                break;
2644
 
                                        case XT_LOG_ENT_NEW_TAB:
2645
 
                                        case XT_LOG_ENT_COMMIT:
2646
 
                                        case XT_LOG_ENT_ABORT:
2647
 
                                        case XT_LOG_ENT_CLEANUP:
2648
 
                                        case XT_LOG_ENT_OP_SYNC:
2649
 
                                        case XT_LOG_ENT_PREPARE:
2650
 
                                                break;
2651
 
                                        case XT_LOG_ENT_DEL_LOG:
2652
 
                                                xtLogID log_id;
2653
 
 
2654
 
                                                log_id = XT_GET_DISK_4(record->xl.xl_log_id_4);
2655
 
                                                xt_dl_set_to_delete(self, db, log_id);
2656
 
                                                break;
2657
 
                                        default:
2658
 
                                                xt_xres_apply_in_order(self, ws, ws->ws_seqread.xseq_rec_log_id, ws->ws_seqread.xseq_rec_log_offset, record);
2659
 
                                                break;
2660
 
                                }
2661
 
                                /* Count the number of bytes read from the log: */
2662
 
                                db->db_xlog.xl_log_bytes_read += ws->ws_seqread.xseq_record_len;
2663
 
                        }
2664
 
                }
2665
 
 
2666
 
#ifdef XT_SORT_REC_WRITES
2667
 
                /* Flush because the freeer may be waiting! */
2668
 
                xt_xres_flush_all(self, ws);
2669
 
#endif
2670
 
 
2671
 
                if (ws->ws_ot) {
2672
 
                        xt_db_return_table_to_pool(self, ws->ws_ot);
2673
 
                        ws->ws_ot = NULL;
2674
 
                }
2675
 
 
2676
 
                xlog_wr_could_go_slower(self, db);
2677
 
 
2678
 
                /* Note, we delay writing the database for a maximum of
2679
 
                 * 2 seconds.
2680
 
                 */
2681
 
                reason = xlog_wr_wait_for_write_condition(self, db, reason);
2682
 
        }
2683
 
 
2684
 
        freer_(); // xt_free_writer_state(ss)
2685
 
}
2686
 
 
2687
 
static void *xlog_wr_run_thread(XTThreadPtr self)
2688
 
{
2689
 
        XTDatabaseHPtr  db = (XTDatabaseHPtr) self->t_data;
2690
 
        int                             count;
2691
 
        void                    *mysql_thread;
2692
 
 
2693
 
        mysql_thread = myxt_create_thread();
2694
 
 
2695
 
        while (!self->t_quit) {
2696
 
                try_(a) {
2697
 
                        /*
2698
 
                         * The garbage collector requires that the database
2699
 
                         * is in use because.
2700
 
                         */
2701
 
                        xt_use_database(self, db, XT_FOR_WRITER);
2702
 
 
2703
 
                        /* This action is both safe and required (see {BACKGROUND-RELEASE-DB}) */
2704
 
                        xt_heap_release(self, self->st_database);
2705
 
 
2706
 
                        xlog_wr_main(self);
2707
 
                }
2708
 
                catch_(a) {
2709
 
                        /* This error is "normal"! */
2710
 
                        if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
2711
 
                                !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
2712
 
                                self->t_exception.e_sys_err == SIGTERM))
2713
 
                                xt_log_and_clear_exception(self);
2714
 
                }
2715
 
                cont_(a);
2716
 
 
2717
 
                /* Avoid releasing the database (done above) */
2718
 
                self->st_database = NULL;
2719
 
                xt_unuse_database(self, self);
2720
 
 
2721
 
                /* After an exception, pause before trying again... */
2722
 
                /* Number of seconds */
2723
 
#ifdef DEBUG
2724
 
                count = 10;
2725
 
#else
2726
 
                count = 2*60;
2727
 
#endif
2728
 
                db->db_wr_idle = XT_THREAD_INERR;
2729
 
                while (!self->t_quit && count > 0) {
2730
 
                        sleep(1);
2731
 
                        count--;
2732
 
                }
2733
 
                db->db_wr_idle = XT_THREAD_BUSY;
2734
 
        }
2735
 
 
2736
 
   /*
2737
 
        * {MYSQL-THREAD-KILL}
2738
 
        myxt_destroy_thread(mysql_thread, TRUE);
2739
 
        */
2740
 
        return NULL;
2741
 
}
2742
 
 
2743
 
static void xlog_wr_free_thread(XTThreadPtr self, void *data)
2744
 
{
2745
 
        XTDatabaseHPtr db = (XTDatabaseHPtr) data;
2746
 
 
2747
 
        if (db->db_wr_thread) {
2748
 
                xt_lock_mutex(self, &db->db_wr_lock);
2749
 
                pushr_(xt_unlock_mutex, &db->db_wr_lock);
2750
 
                db->db_wr_thread = NULL;
2751
 
                freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2752
 
        }
2753
 
}
2754
 
 
2755
 
xtPublic void xt_start_writer(XTThreadPtr self, XTDatabaseHPtr db)
2756
 
{
2757
 
        char name[PATH_MAX];
2758
 
 
2759
 
        sprintf(name, "WR-%s", xt_last_directory_of_path(db->db_main_path));
2760
 
        xt_remove_dir_char(name);
2761
 
        db->db_wr_thread = xt_create_daemon(self, name);
2762
 
        xt_set_thread_data(db->db_wr_thread, db, xlog_wr_free_thread);
2763
 
        xt_run_thread(self, db->db_wr_thread, xlog_wr_run_thread);
2764
 
}
2765
 
 
2766
 
/*
2767
 
 * This function is called on database shutdown.
2768
 
 * We will wait a certain amounnt of time for the writer to
2769
 
 * complete its work.
2770
 
 * If it takes to long we will abort!
2771
 
 */
2772
 
xtPublic void xt_wait_for_writer(XTThreadPtr self, XTDatabaseHPtr db)
2773
 
{
2774
 
        time_t  then, now;
2775
 
        xtBool  message = FALSE;
2776
 
 
2777
 
        if (db->db_wr_thread) {
2778
 
                then = time(NULL);
2779
 
                while (xt_comp_log_pos(db->db_wr_log_id, db->db_wr_log_offset, db->db_wr_flush_point_log_id, db->db_wr_flush_point_log_offset) < 0) {
2780
 
 
2781
 
                        xt_lock_mutex(self, &db->db_wr_lock);
2782
 
                        pushr_(xt_unlock_mutex, &db->db_wr_lock);
2783
 
                        db->db_wr_thread_waiting++;
2784
 
                        /* Wake the writer so that it con complete its work. */
2785
 
                        if (db->db_wr_idle) {
2786
 
                                if (!xt_broadcast_cond_ns(&db->db_wr_cond))
2787
 
                                        xt_log_and_clear_exception_ns();
2788
 
                        }
2789
 
                        freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2790
 
 
2791
 
                        xt_sleep_milli_second(10);
2792
 
 
2793
 
                        xt_lock_mutex(self, &db->db_wr_lock);
2794
 
                        pushr_(xt_unlock_mutex, &db->db_wr_lock);
2795
 
                        db->db_wr_thread_waiting--;
2796
 
                        freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2797
 
 
2798
 
                        now = time(NULL);
2799
 
                        if (now >= then + 16) {
2800
 
                                xt_logf(XT_NT_INFO, "Aborting wait for '%s' writer\n", db->db_name);
2801
 
                                message = FALSE;
2802
 
                                break;
2803
 
                        }
2804
 
                        if (now >= then + 2) {
2805
 
                                if (!message) {
2806
 
                                        message = TRUE;
2807
 
                                        xt_logf(XT_NT_INFO, "Waiting for '%s' writer...\n", db->db_name);
2808
 
                                }
2809
 
                        }
2810
 
                }
2811
 
                
2812
 
                if (message)
2813
 
                        xt_logf(XT_NT_INFO, "Writer '%s' done.\n", db->db_name);
2814
 
        }
2815
 
}
2816
 
 
2817
 
xtPublic void xt_stop_writer(XTThreadPtr self, XTDatabaseHPtr db)
2818
 
{
2819
 
        XTThreadPtr thr_wr;
2820
 
 
2821
 
        if (db->db_wr_thread) {
2822
 
                xt_lock_mutex(self, &db->db_wr_lock);
2823
 
                pushr_(xt_unlock_mutex, &db->db_wr_lock);
2824
 
 
2825
 
                /* This pointer is safe as long as you have the transaction lock. */
2826
 
                if ((thr_wr = db->db_wr_thread)) {
2827
 
                        xtThreadID tid = thr_wr->t_id;
2828
 
 
2829
 
                        /* Make sure the thread quits when woken up. */
2830
 
                        xt_terminate_thread(self, thr_wr);
2831
 
 
2832
 
                        /* Wake the writer thread so that it will quit: */
2833
 
                        xt_broadcast_cond(self, &db->db_wr_cond);
2834
 
        
2835
 
                        freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2836
 
 
2837
 
                        /*
2838
 
                         * GOTCHA: This is a wierd thing but the SIGTERM directed
2839
 
                         * at a particular thread (in this case the sweeper) was
2840
 
                         * being caught by a different thread and killing the server
2841
 
                         * sometimes. Disconcerting.
2842
 
                         * (this may only be a problem on Mac OS X)
2843
 
                        xt_kill_thread(thread);
2844
 
                         */
2845
 
                        xt_wait_for_thread_to_exit(tid, FALSE);
2846
 
        
2847
 
                        /* PMC - This should not be necessary to set the signal here, but in the
2848
 
                         * debugger the handler is not called!!?
2849
 
                        thr_wr->t_delayed_signal = SIGTERM;
2850
 
                        xt_kill_thread(thread);
2851
 
                         */
2852
 
                        db->db_wr_thread = NULL;
2853
 
                }
2854
 
                else
2855
 
                        freer_(); // xt_unlock_mutex(&db->db_wr_lock)
2856
 
        }
2857
 
}
2858
 
 
2859
 
#ifdef NOT_USED
2860
 
static void xlog_add_to_flush_buffer(u_int flush_count, XTXLogBlockPtr *flush_buffer, XTXLogBlockPtr block)
2861
 
{
2862
 
        register u_int          count = flush_count;
2863
 
        register u_int          i;
2864
 
        register u_int          guess;
2865
 
        register xtInt8         r;
2866
 
 
2867
 
        i = 0;
2868
 
        while (i < count) {
2869
 
                guess = (i + count - 1) >> 1;
2870
 
                r = (xtInt8) block->xlb_address - (xtInt8) flush_buffer[guess]->xlb_address;
2871
 
                if (r == 0) {
2872
 
                        // Should not happen...
2873
 
                        ASSERT_NS(FALSE);
2874
 
                        return;
2875
 
                }
2876
 
                if (r < (xtInt8) 0)
2877
 
                        count = guess;
2878
 
                else
2879
 
                        i = guess + 1;
2880
 
        }
2881
 
 
2882
 
        /* Insert at position i */
2883
 
        memmove(flush_buffer + i + 1, flush_buffer + i, (flush_count - i) * sizeof(XTXLogBlockPtr));
2884
 
        flush_buffer[i] = block;
2885
 
}
2886
 
 
2887
 
static XTXLogBlockPtr xlog_find_block(XTOpenFilePtr file, xtLogID log_id, off_t address, XTXLogCacheSegPtr *ret_seg)
2888
 
{
2889
 
        register XTXLogCacheSegPtr      seg;
2890
 
        register XTXLogBlockPtr         block;
2891
 
        register u_int                          hash_idx;
2892
 
        register XTXLogCacheRec         *dcg = &xt_xlog_cache;
2893
 
 
2894
 
        seg = &dcg->xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK];
2895
 
        hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % dcg->xlc_hash_size;
2896
 
 
2897
 
        xt_lock_mutex_ns(&seg->lcs_lock);
2898
 
        retry:
2899
 
        block = seg->lcs_hash_table[hash_idx];
2900
 
        while (block) {
2901
 
                if (block->xlb_address == address && block->xlb_log_id == log_id) {
2902
 
                        ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
2903
 
 
2904
 
                        /* Wait if the block is being read or written.
2905
 
                         * If we will just read the data, then we don't care
2906
 
                         * if the buffer is being written.
2907
 
                         */
2908
 
                        if (block->xlb_state == XLC_BLOCK_READING) {
2909
 
                                if (!xt_timed_wait_cond_ns(&seg->lcs_cond, &seg->lcs_lock, 100))
2910
 
                                        break;
2911
 
                                goto retry;
2912
 
                        }
2913
 
 
2914
 
                        *ret_seg = seg;
2915
 
                        return block;
2916
 
                }
2917
 
                block = block->xlb_next;
2918
 
        }
2919
 
        
2920
 
        /* Block not found: */
2921
 
        xt_unlock_mutex_ns(&seg->lcs_lock);
2922
 
        return NULL;
2923
 
}
2924
 
 
2925
 
static int xlog_cmp_log_files(struct XTThread *self, register const void *thunk, register const void *a, register const void *b)
2926
 
{
2927
 
#pragma unused(self, thunk)
2928
 
        xtLogID                         lf_id = *((xtLogID *) a);
2929
 
        XTXactLogFilePtr        lf_ptr = (XTXactLogFilePtr) b;
2930
 
 
2931
 
        if (lf_id < lf_ptr->lf_log_id)
2932
 
                return -1;
2933
 
        if (lf_id == lf_ptr->lf_log_id)
2934
 
                return 0;
2935
 
        return 1;
2936
 
}
2937
 
 
2938
 
#endif
2939
 
 
2940
 
 
2941
 
#ifdef OLD_CODE
2942
 
static xtBool xlog_free_lru_blocks()
2943
 
{
2944
 
        XTXLogBlockPtr          block, pblock;
2945
 
        xtWord4                         ru_time;
2946
 
        xtLogID                         log_id;
2947
 
        off_t                           address;
2948
 
        //off_t                         hash;
2949
 
        XTXLogCacheSegPtr       seg;
2950
 
        u_int                           hash_idx;
2951
 
        xtBool                          have_global_lock = FALSE;
2952
 
 
2953
 
#ifdef DEBUG_CHECK_CACHE
2954
 
        //xt_xlog_check_cache();
2955
 
#endif
2956
 
        retry:
2957
 
        if (!(block = xt_xlog_cache.xlc_lru_block))
2958
 
                return OK;
2959
 
 
2960
 
        ru_time = block->xlb_ru_time;
2961
 
        log_id = block->xlb_log_id;
2962
 
        address = block->xlb_address;
2963
 
 
2964
 
        /*
2965
 
        hash = (address >> XT_XLC_BLOCK_SHIFTS) ^ ((off_t) log_id << 28);
2966
 
        seg = &xt_xlog_cache.xlc_segment[hash & XLC_SEGMENT_MASK];
2967
 
        hash_idx = (hash >> XT_XLC_SEGMENT_SHIFTS) % xt_xlog_cache.xlc_hash_size;
2968
 
        */
2969
 
        seg = &xt_xlog_cache.xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK];
2970
 
        hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % xt_xlog_cache.xlc_hash_size;
2971
 
 
2972
 
        xt_lock_mutex_ns(&seg->lcs_lock);
2973
 
 
2974
 
        free_more:
2975
 
        pblock = NULL;
2976
 
        block = seg->lcs_hash_table[hash_idx];
2977
 
        while (block) {
2978
 
                if (block->xlb_address == address && block->xlb_log_id == log_id) {
2979
 
                        ASSERT_NS(block->xlb_state != XLC_BLOCK_FREE);
2980
 
                        
2981
 
                        /* Try again if the block has been used in the meantime: */
2982
 
                        if (ru_time != block->xlb_ru_time) {
2983
 
                                if (have_global_lock)
2984
 
                                        /* Having this lock means we have already freed at least one block so
2985
 
                                         * don't bother to free more if we are having trouble.
2986
 
                                         */
2987
 
                                        goto done_ok;
2988
 
 
2989
 
                                /* If the recently used time has changed, then the
2990
 
                                 * block is probably no longer the LR used.
2991
 
                                 */
2992
 
                                xt_unlock_mutex_ns(&seg->lcs_lock);
2993
 
                                goto retry;
2994
 
                        }
2995
 
 
2996
 
                        /* Wait if the block is being read: */
2997
 
                        if (block->xlb_state == XLC_BLOCK_READING) {
2998
 
                                if (have_global_lock)
2999
 
                                        goto done_ok;
3000
 
 
3001
 
                                /* Wait for the block to be read, then try again. */
3002
 
                                if (!xt_timed_wait_cond_ns(&seg->lcs_cond, &seg->lcs_lock, 100))
3003
 
                                        goto failed;
3004
 
                                xt_unlock_mutex_ns(&seg->lcs_lock);
3005
 
                                goto retry;
3006
 
                        }
3007
 
                        
3008
 
                        goto free_the_block;
3009
 
                }
3010
 
                pblock = block;
3011
 
                block = block->xlb_next;
3012
 
        }
3013
 
 
3014
 
        if (have_global_lock) {
3015
 
                xt_unlock_mutex_ns(&xt_xlog_cache.xlc_lock);
3016
 
                have_global_lock = FALSE;
3017
 
        }
3018
 
 
3019
 
        /* We did not find the block, someone else freed it... */
3020
 
        xt_unlock_mutex_ns(&seg->lcs_lock);
3021
 
        goto retry;
3022
 
 
3023
 
        free_the_block:
3024
 
        ASSERT_NS(block->xlb_state == XLC_BLOCK_CLEAN);
3025
 
 
3026
 
        /* Remove from the hash table: */
3027
 
        if (pblock)
3028
 
                pblock->xlb_next = block->xlb_next;
3029
 
        else
3030
 
                seg->lcs_hash_table[hash_idx] = block->xlb_next;
3031
 
 
3032
 
        /* Now free the block */
3033
 
        if (!have_global_lock) {
3034
 
                xt_lock_mutex_ns(&xt_xlog_cache.xlc_lock);
3035
 
                have_global_lock = TRUE;
3036
 
        }
3037
 
 
3038
 
        /* Remove from the MRU list: */
3039
 
        if (xt_xlog_cache.xlc_lru_block == block)
3040
 
                xt_xlog_cache.xlc_lru_block = block->xlb_mr_used;
3041
 
        if (xt_xlog_cache.xlc_mru_block == block)
3042
 
                xt_xlog_cache.xlc_mru_block = block->xlb_lr_used;
3043
 
        if (block->xlb_lr_used)
3044
 
                block->xlb_lr_used->xlb_mr_used = block->xlb_mr_used;
3045
 
        if (block->xlb_mr_used)
3046
 
                block->xlb_mr_used->xlb_lr_used = block->xlb_lr_used;
3047
 
 
3048
 
        /* Put the block on the free list: */
3049
 
        block->xlb_next = xt_xlog_cache.xlc_free_list;
3050
 
        xt_xlog_cache.xlc_free_list = block;
3051
 
        xt_xlog_cache.xlc_free_count++;
3052
 
        block->xlb_state = XLC_BLOCK_FREE;
3053
 
 
3054
 
        if (xt_xlog_cache.xlc_free_count < XT_XLC_MAX_FREE_COUNT) {
3055
 
                /* Now that we have all the locks, try to free some more in this segment: */
3056
 
                block = block->xlb_mr_used;
3057
 
                for (u_int i=0; block && i<XLC_SEGMENT_COUNT; i++) {
3058
 
                        ru_time = block->xlb_ru_time;
3059
 
                        log_id = block->xlb_log_id;
3060
 
                        address = block->xlb_address;
3061
 
 
3062
 
                        if (seg == &xt_xlog_cache.xlc_segment[((u_int) address >> XT_XLC_BLOCK_SHIFTS) & XLC_SEGMENT_MASK]) {
3063
 
                                hash_idx = (((u_int) (address >> (XT_XLC_SEGMENT_SHIFTS + XT_XLC_BLOCK_SHIFTS))) ^ (log_id << 16)) % xt_xlog_cache.xlc_hash_size;
3064
 
                                goto free_more;
3065
 
                        }
3066
 
                        block = block->xlb_mr_used;
3067
 
                }
3068
 
        }
3069
 
 
3070
 
        done_ok:
3071
 
        xt_unlock_mutex_ns(&xt_xlog_cache.xlc_lock);
3072
 
        xt_unlock_mutex_ns(&seg->lcs_lock);
3073
 
#ifdef DEBUG_CHECK_CACHE
3074
 
        //xt_xlog_check_cache();
3075
 
#endif
3076
 
        return OK;
3077
 
        
3078
 
        failed:
3079
 
        xt_unlock_mutex_ns(&seg->lcs_lock);
3080
 
#ifdef DEBUG_CHECK_CACHE
3081
 
        //xt_xlog_check_cache();
3082
 
#endif
3083
 
        return FAILED;
3084
 
}
3085
 
 
3086
 
#endif