~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/restart_xt.cc

  • Committer: Olaf van der Spek
  • Date: 2011-02-12 18:24:24 UTC
  • mto: (2167.1.2 build) (2172.1.4 build)
  • mto: This revision was merged to the branch mainline in revision 2168.
  • Revision ID: olafvdspek@gmail.com-20110212182424-kgnm9osi7qo97at2
casts

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2007 PrimeBase Technologies GmbH
 
2
 *
 
3
 * PrimeBase XT
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
 
18
 *
 
19
 * 2007-11-12   Paul McCullagh
 
20
 *
 
21
 * H&G2JCtL
 
22
 *
 
23
 * Restart and write data to the database.
 
24
 */
 
25
 
 
26
#include "xt_config.h"
 
27
 
 
28
#include <signal.h>
 
29
#include <time.h>
 
30
 
 
31
#ifndef DRIZZLED
 
32
#include "mysql_priv.h"
 
33
#endif
 
34
 
 
35
#include "ha_pbxt.h"
 
36
 
 
37
#ifdef DRIZZLED
 
38
#include <drizzled/data_home.h>
 
39
using drizzled::module::Registry;
 
40
#endif
 
41
 
 
42
#include "xactlog_xt.h"
 
43
#include "database_xt.h"
 
44
#include "util_xt.h"
 
45
#include "strutil_xt.h"
 
46
#include "filesys_xt.h"
 
47
#include "myxt_xt.h"
 
48
#include "trace_xt.h"
 
49
 
 
50
static xtBool xres_get_next_to_flush(XTDatabaseHPtr db, xtTableID *tab_id, int *flush_bit, xtBool skip_busy, xtBool rec_first);
 
51
 
 
52
#ifdef DEBUG
 
53
//#define DEBUG_PRINT
 
54
//#define DEBUG_KEEP_LOGS
 
55
//#define PRINT_LOG_ON_RECOVERY
 
56
//#define TRACE_RECORD_DATA
 
57
//#define SKIP_STARTUP_CHECKPOINT
 
58
//#define NEVER_CHECKPOINT
 
59
//#define TRACE_CHECKPOINT
 
60
#endif
 
61
 
 
62
#define PRINTF          printf
 
63
//#define PRINTF                xt_ftracef
 
64
//#define PRINTF                xt_trace
 
65
 
 
66
/*
 
67
 * -----------------------------------------------------------------------
 
68
 * GLOBALS
 
69
 */
 
70
 
 
71
xtPublic int                            pbxt_recovery_state;
 
72
 
 
73
/*
 
74
 * -----------------------------------------------------------------------
 
75
 * UTILITIES
 
76
 */
 
77
 
 
78
#ifdef TRACE_RECORD_DATA
 
79
static void xt_print_bytes(xtWord1 *buf, u_int len)
 
80
{
 
81
        for (u_int i=0; i<len; i++) {
 
82
                PRINTF("%02x ", (u_int) *buf);
 
83
                buf++;
 
84
        }
 
85
}
 
86
#endif
 
87
 
 
88
void xt_print_log_record(xtLogID log, xtLogOffset offset, XTXactLogBufferDPtr record)
 
89
{
 
90
        const char              *type = NULL;
 
91
        const char              *rec_type = NULL;
 
92
        xtOpSeqNo               op_no = 0;
 
93
        xtTableID               tab_id = 0;
 
94
        xtRowID                 row_id = 0;
 
95
        xtRecordID              rec_id = 0;
 
96
        xtBool                  xn_set = FALSE;
 
97
        xtXactID                xn_id = 0;
 
98
        char                    buffer[200];
 
99
        XTTabRecExtDPtr rec_buf;
 
100
        XTTabRecExtDPtr ext_rec;
 
101
        XTTabRecFixDPtr fix_rec;
 
102
        u_int                   rec_len;
 
103
        xtLogID                 log_id = 0;
 
104
        xtLogOffset             log_offset = 0;
 
105
 
 
106
        rec_buf = NULL;
 
107
        ext_rec = NULL;
 
108
        fix_rec = NULL;
 
109
        rec_len = 0;
 
110
        switch (record->xl.xl_status_1) {
 
111
                case XT_LOG_ENT_REC_MODIFIED:
 
112
                case XT_LOG_ENT_UPDATE:
 
113
                case XT_LOG_ENT_INSERT:
 
114
                case XT_LOG_ENT_DELETE:
 
115
                case XT_LOG_ENT_UPDATE_BG:
 
116
                case XT_LOG_ENT_INSERT_BG:
 
117
                case XT_LOG_ENT_DELETE_BG:
 
118
                        op_no = XT_GET_DISK_4(record->xu.xu_op_seq_4);
 
119
                        tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
 
120
                        rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
 
121
                        xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
 
122
                        row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
 
123
                        rec_len = XT_GET_DISK_2(record->xu.xu_size_2);
 
124
                        xn_set = TRUE;
 
125
                        type="rec";
 
126
                        rec_buf = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;
 
127
                        ext_rec = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;
 
128
                        if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
 
129
                                log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
 
130
                                log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
 
131
                        }
 
132
                        else {
 
133
                                ext_rec = NULL;
 
134
                                fix_rec = (XTTabRecFixDPtr) &record->xu.xu_rec_type_1;
 
135
                        }
 
136
                        break;
 
137
                case XT_LOG_ENT_UPDATE_FL:
 
138
                case XT_LOG_ENT_INSERT_FL:
 
139
                case XT_LOG_ENT_DELETE_FL:
 
140
                case XT_LOG_ENT_UPDATE_FL_BG:
 
141
                case XT_LOG_ENT_INSERT_FL_BG:
 
142
                case XT_LOG_ENT_DELETE_FL_BG:
 
143
                        op_no = XT_GET_DISK_4(record->xf.xf_op_seq_4);
 
144
                        tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
 
145
                        rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
 
146
                        xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
 
147
                        row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
 
148
                        rec_len = XT_GET_DISK_2(record->xf.xf_size_2);
 
149
                        xn_set = TRUE;
 
150
                        type="rec";
 
151
                        rec_buf = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;
 
152
                        ext_rec = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;
 
153
                        if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
 
154
                                log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
 
155
                                log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
 
156
                        }
 
157
                        else {
 
158
                                ext_rec = NULL;
 
159
                                fix_rec = (XTTabRecFixDPtr) &record->xf.xf_rec_type_1;
 
160
                        }
 
161
                        break;
 
162
                case XT_DEFUNKT_REC_FREED:
 
163
                case XT_DEFUNKT_REC_REMOVED:
 
164
                case XT_DEFUNKT_REC_REMOVED_EXT:
 
165
                        op_no = XT_GET_DISK_4(record->fr.fr_op_seq_4);
 
166
                        tab_id = XT_GET_DISK_4(record->fr.fr_tab_id_4);
 
167
                        rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
 
168
                        xn_id = XT_GET_DISK_4(record->fr.fr_xact_id_4);
 
169
                        xn_set = TRUE;
 
170
                        type="rec";
 
171
                        break;
 
172
                case XT_LOG_ENT_REC_REMOVED_BI:
 
173
                        op_no = XT_GET_DISK_4(record->rb.rb_op_seq_4);
 
174
                        tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
 
175
                        rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
 
176
                        xn_id = XT_GET_DISK_4(record->rb.rb_xact_id_4);
 
177
                        row_id = XT_GET_DISK_4(record->rb.rb_row_id_4);
 
178
                        rec_len = XT_GET_DISK_2(record->rb.rb_size_2);
 
179
                        xn_set = TRUE;
 
180
                        type="rec";
 
181
                        rec_buf = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
 
182
                        ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
 
183
                        if (XT_REC_IS_EXT_DLOG(record->rb.rb_rec_type_1)) {
 
184
                                log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
 
185
                                log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
 
186
                        }
 
187
                        else {
 
188
                                ext_rec = NULL;
 
189
                                fix_rec = (XTTabRecFixDPtr) &record->rb.rb_rec_type_1;
 
190
                        }
 
191
                        break;
 
192
                case XT_LOG_ENT_REC_REMOVED_BI_L:
 
193
                        op_no = XT_GET_DISK_4(record->bl.bl_op_seq_4);
 
194
                        tab_id = XT_GET_DISK_4(record->bl.bl_tab_id_4);
 
195
                        rec_id = XT_GET_DISK_4(record->bl.bl_rec_id_4);
 
196
                        xn_id = XT_GET_DISK_4(record->bl.bl_xact_id_4);
 
197
                        row_id = XT_GET_DISK_4(record->bl.bl_row_id_4);
 
198
                        rec_len = XT_GET_DISK_2(record->bl.bl_size_2);
 
199
                        xn_set = TRUE;
 
200
                        type="rec";
 
201
                        rec_buf = (XTTabRecExtDPtr) &record->bl.bl_rec_type_1;
 
202
                        ext_rec = (XTTabRecExtDPtr) &record->bl.bl_rec_type_1;
 
203
                        if (XT_REC_IS_EXT_DLOG(record->bl.bl_rec_type_1)) {
 
204
                                log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
 
205
                                log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
 
206
                        }
 
207
                        else {
 
208
                                ext_rec = NULL;
 
209
                                fix_rec = (XTTabRecFixDPtr) &record->bl.bl_rec_type_1;
 
210
                        }
 
211
                        break;
 
212
                case XT_LOG_ENT_REC_MOVED:
 
213
                        op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);
 
214
                        tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
 
215
                        rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
 
216
                        log_id = XT_GET_DISK_2(&record->xw.xw_rec_type_1);                      // This is actually correct
 
217
                        log_offset = XT_GET_DISK_6(record->xw.xw_next_rec_id_4);        // This is actually correct!
 
218
                        type="rec";
 
219
                        break;
 
220
                case XT_LOG_ENT_REC_CLEANED:
 
221
                case XT_LOG_ENT_REC_CLEANED_1:
 
222
                case XT_LOG_ENT_REC_UNLINKED:
 
223
                        op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);
 
224
                        tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
 
225
                        rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
 
226
                        type="rec";
 
227
                        break;
 
228
                case XT_LOG_ENT_ROW_NEW:
 
229
                case XT_LOG_ENT_ROW_NEW_FL:
 
230
                case XT_LOG_ENT_ROW_ADD_REC:
 
231
                case XT_LOG_ENT_ROW_SET:
 
232
                case XT_LOG_ENT_ROW_FREED:
 
233
                        op_no = XT_GET_DISK_4(record->xa.xa_op_seq_4);
 
234
                        tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
 
235
                        rec_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
 
236
                        type="row";
 
237
                        break;
 
238
                case XT_LOG_ENT_NO_OP:
 
239
                        op_no = XT_GET_DISK_4(record->no.no_op_seq_4);
 
240
                        tab_id = XT_GET_DISK_4(record->no.no_tab_id_4);
 
241
                        type="-";
 
242
                        break;
 
243
                case XT_LOG_ENT_END_OF_LOG:
 
244
                        break;
 
245
        }
 
246
 
 
247
        switch (record->xl.xl_status_1) {
 
248
                case XT_LOG_ENT_HEADER:
 
249
                        rec_type = "HEADER";
 
250
                        break;
 
251
                case XT_LOG_ENT_NEW_LOG:
 
252
                        rec_type = "NEW LOG";
 
253
                        break;
 
254
                case XT_LOG_ENT_DEL_LOG:
 
255
                        sprintf(buffer, "DEL LOG log=%d ", (int) XT_GET_DISK_4(record->xl.xl_log_id_4));
 
256
                        rec_type = buffer;
 
257
                        break;
 
258
                case XT_LOG_ENT_NEW_TAB:
 
259
                        rec_type = "NEW TABLE";
 
260
                        tab_id = XT_GET_DISK_4(record->xt.xt_tab_id_4);
 
261
                        break;
 
262
                case XT_LOG_ENT_COMMIT:
 
263
                        rec_type = "COMMIT";
 
264
                        xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
 
265
                        xn_set = TRUE;
 
266
                        break;
 
267
                case XT_LOG_ENT_ABORT:
 
268
                        rec_type = "ABORT";
 
269
                        xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
 
270
                        xn_set = TRUE;
 
271
                        break;
 
272
                case XT_LOG_ENT_CLEANUP:
 
273
                        rec_type = "CLEANUP";
 
274
                        xn_id = XT_GET_DISK_4(record->xc.xc_xact_id_4);
 
275
                        xn_set = TRUE;
 
276
                        break;
 
277
                case XT_LOG_ENT_REC_MODIFIED:
 
278
                        rec_type = "MODIFIED";
 
279
                        break;
 
280
                case XT_LOG_ENT_UPDATE:
 
281
                        rec_type = "UPDATE";
 
282
                        break;
 
283
                case XT_LOG_ENT_UPDATE_FL:
 
284
                        rec_type = "UPDATE-FL";
 
285
                        break;
 
286
                case XT_LOG_ENT_INSERT:
 
287
                        rec_type = "INSERT";
 
288
                        break;
 
289
                case XT_LOG_ENT_INSERT_FL:
 
290
                        rec_type = "INSERT-FL";
 
291
                        break;
 
292
                case XT_LOG_ENT_DELETE:
 
293
                        rec_type = "DELETE";
 
294
                        break;
 
295
                case XT_LOG_ENT_DELETE_FL:
 
296
                        rec_type = "DELETE-FL";
 
297
                        break;
 
298
                case XT_LOG_ENT_UPDATE_BG:
 
299
                        rec_type = "UPDATE-BG";
 
300
                        break;
 
301
                case XT_LOG_ENT_UPDATE_FL_BG:
 
302
                        rec_type = "UPDATE-FL-BG";
 
303
                        break;
 
304
                case XT_LOG_ENT_INSERT_BG:
 
305
                        rec_type = "INSERT-BG";
 
306
                        break;
 
307
                case XT_LOG_ENT_INSERT_FL_BG:
 
308
                        rec_type = "INSERT-FL-BG";
 
309
                        break;
 
310
                case XT_LOG_ENT_DELETE_BG:
 
311
                        rec_type = "DELETE-BG";
 
312
                        break;
 
313
                case XT_LOG_ENT_DELETE_FL_BG:
 
314
                        rec_type = "DELETE-FL-BG";
 
315
                        break;
 
316
                case XT_DEFUNKT_REC_FREED:
 
317
                        rec_type = "FREE REC";
 
318
                        break;
 
319
                case XT_DEFUNKT_REC_REMOVED:
 
320
                        rec_type = "REMOVED REC";
 
321
                        break;
 
322
                case XT_DEFUNKT_REC_REMOVED_EXT:
 
323
                        rec_type = "REMOVED-X REC";
 
324
                        break;
 
325
                case XT_LOG_ENT_REC_REMOVED_BI:
 
326
                        rec_type = "REMOVED-BI REC";
 
327
                        break;
 
328
                case XT_LOG_ENT_REC_REMOVED_BI_L:
 
329
                        rec_type = "REMOVED-BI (L) REC";
 
330
                        break;
 
331
                case XT_LOG_ENT_REC_MOVED:
 
332
                        rec_type = "MOVED REC";
 
333
                        break;
 
334
                case XT_LOG_ENT_REC_CLEANED:
 
335
                        rec_type = "CLEAN REC";
 
336
                        break;
 
337
                case XT_LOG_ENT_REC_CLEANED_1:
 
338
                        rec_type = "CLEAN REC-1";
 
339
                        break;
 
340
                case XT_LOG_ENT_REC_UNLINKED:
 
341
                        rec_type = "UNLINK REC";
 
342
                        break;
 
343
                case XT_LOG_ENT_ROW_NEW:
 
344
                        rec_type = "NEW ROW";
 
345
                        break;
 
346
                case XT_LOG_ENT_ROW_NEW_FL:
 
347
                        rec_type = "NEW ROW-FL";
 
348
                        break;
 
349
                case XT_LOG_ENT_ROW_ADD_REC:
 
350
                        rec_type = "REC ADD ROW";
 
351
                        break;
 
352
                case XT_LOG_ENT_ROW_SET:
 
353
                        rec_type = "SET ROW";
 
354
                        break;
 
355
                case XT_LOG_ENT_ROW_FREED:
 
356
                        rec_type = "FREE ROW";
 
357
                        break;
 
358
                case XT_LOG_ENT_OP_SYNC:
 
359
                        rec_type = "OP SYNC";
 
360
                        break;
 
361
                case XT_LOG_ENT_NO_OP:
 
362
                        rec_type = "NO OP";
 
363
                        break;
 
364
                case XT_LOG_ENT_END_OF_LOG:
 
365
                        rec_type = "END OF LOG";
 
366
                        break;
 
367
                case XT_LOG_ENT_PREPARE:
 
368
                        rec_type = "PREPARE";
 
369
                        xn_id = XT_GET_DISK_4(record->xp.xp_xact_id_4);
 
370
                        xn_set = TRUE;
 
371
                        break;
 
372
        }
 
373
 
 
374
        if (log)
 
375
                PRINTF("log=%d offset=%d ", (int) log, (int) offset);
 
376
        PRINTF("%s ", rec_type);
 
377
        if (type)
 
378
                PRINTF("op=%lu tab=%lu %s=%lu ", (u_long) op_no, (u_long) tab_id, type, (u_long) rec_id);
 
379
        else if (tab_id)
 
380
                PRINTF("tab=%lu ", (u_long) tab_id);
 
381
        if (row_id)
 
382
                PRINTF("row=%lu ", (u_long) row_id);
 
383
        if (log_id)
 
384
                PRINTF("log=%lu offset=%lu ", (u_long) log_id, (u_long) log_offset);
 
385
        if (xn_set)
 
386
                PRINTF("xact=%lu ", (u_long) xn_id);
 
387
 
 
388
#ifdef TRACE_RECORD_DATA
 
389
        if (rec_buf) {
 
390
                switch (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_MASK) {
 
391
                        case XT_TAB_STATUS_FREED:
 
392
                                PRINTF("FREE");
 
393
                                break;
 
394
                        case XT_TAB_STATUS_DELETE:
 
395
                                PRINTF("DELE");
 
396
                                break;
 
397
                        case XT_TAB_STATUS_FIXED:
 
398
                                PRINTF("FIX-");
 
399
                                break;
 
400
                        case XT_TAB_STATUS_VARIABLE:
 
401
                                PRINTF("VAR-");
 
402
                                break;
 
403
                        case XT_TAB_STATUS_EXT_DLOG:
 
404
                                PRINTF("EXT-");
 
405
                                break;
 
406
                }
 
407
                if (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_CLEANED_BIT)
 
408
                        PRINTF("C");
 
409
                else
 
410
                        PRINTF(" ");
 
411
        }
 
412
        if (ext_rec) {
 
413
                rec_len -= offsetof(XTTabRecExtDRec, re_data);
 
414
                xt_print_bytes((xtWord1 *) ext_rec, offsetof(XTTabRecExtDRec, re_data));
 
415
                PRINTF("| ");
 
416
                if (rec_len > 20)
 
417
                        rec_len = 20;
 
418
                xt_print_bytes(ext_rec->re_data, rec_len);
 
419
        }
 
420
        if (fix_rec) {
 
421
                rec_len -= offsetof(XTTabRecFixDRec, rf_data);
 
422
                xt_print_bytes((xtWord1 *) fix_rec, offsetof(XTTabRecFixDRec, rf_data));
 
423
                PRINTF("| ");
 
424
                if (rec_len > 20)
 
425
                        rec_len = 20;
 
426
                xt_print_bytes(fix_rec->rf_data, rec_len);
 
427
        }
 
428
#endif
 
429
 
 
430
        PRINTF("\n");
 
431
}
 
432
 
 
433
#ifdef DEBUG_PRINT
 
434
void check_rows(void)
 
435
{
 
436
        static XTOpenFilePtr of = NULL;
 
437
 
 
438
        if (!of)
 
439
                of = xt_open_file_ns("./test/test_tab-1.xtr", XT_FS_DEFAULT);
 
440
        if (of) {
 
441
                size_t size = (size_t) xt_seek_eof_file(NULL, of);
 
442
                xtWord8 *buffer = (xtWord8 *) xt_malloc_ns(size);
 
443
                xt_pread_file(of, 0, size, size, buffer, NULL);
 
444
                for (size_t i=0; i<size/8; i++) {
 
445
                        if (!buffer[i])
 
446
                                printf("%d is NULL\n", (int) i);
 
447
                }
 
448
        }
 
449
}
 
450
 
 
451
#endif
 
452
 
 
453
/*
 
454
 * -----------------------------------------------------------------------
 
455
 * DELAYED WRITE FUNCTIONS
 
456
 */
 
457
 
 
458
#ifdef XT_SORT_REC_WRITES
 
459
 
 
460
xtPublic xtBool xt_xres_delay_flush(XTOpenTablePtr ot, xtBool lock)
 
461
{
 
462
        XTTableHPtr             tab = ot->ot_table;
 
463
        XTDelayWritePtr ptr;
 
464
        XTThreadPtr             thread = ot->ot_thread;
 
465
 
 
466
        if (!xt_sl_get_size(tab->tab_rec_dw_writes))
 
467
                return OK;
 
468
 
 
469
        if (lock)
 
470
                xt_sl_lock_ns(tab->tab_rec_dw_writes, ot->ot_thread);
 
471
        if ((ptr = (XTDelayWritePtr) xt_sl_item_at(tab->tab_rec_dw_writes, 0))) {
 
472
                u_int i, size;
 
473
 
 
474
                size = xt_sl_get_size(tab->tab_rec_dw_writes);
 
475
                i = 0;
 
476
                while (i<size) {
 
477
                        if (i+1 < size &&
 
478
                                ptr->dw_rec_id+1 == (ptr+1)->dw_rec_id &&
 
479
                                (ptr+1)->dw_offset == 0 &&
 
480
                                ptr->dw_data + tab->tab_dic.dic_rec_size - ptr->dw_offset == (ptr+1)->dw_data) {
 
481
                                XTDelayWritePtr start_ptr;
 
482
                                u_int                   start_i;
 
483
                                size_t                  tfer;
 
484
 
 
485
                                start_ptr = ptr;
 
486
                                start_i = i;
 
487
                                while (i+1 < size &&
 
488
                                        ptr->dw_rec_id+1 == (ptr+1)->dw_rec_id && 
 
489
                                        (ptr+1)->dw_offset == 0 &&
 
490
                                        ptr->dw_data + tab->tab_dic.dic_rec_size - ptr->dw_offset == (ptr+1)->dw_data) {
 
491
                                        ptr++;
 
492
                                        i++;
 
493
                                }
 
494
                                tfer = (ptr->dw_data + ptr->dw_size) - start_ptr->dw_data;
 
495
                                if (!xt_pwrite_file(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, start_ptr->dw_rec_id) + start_ptr->dw_offset, tfer, &tab->tab_rec_dw_data[start_ptr->dw_data], &thread->st_statistics.st_rec, thread))
 
496
                                        goto failed;
 
497
                        }
 
498
                        else {
 
499
                                if (!xt_pwrite_file(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, ptr->dw_rec_id) + ptr->dw_offset, ptr->dw_size, &tab->tab_rec_dw_data[ptr->dw_data], &thread->st_statistics.st_rec, thread))
 
500
                                        goto failed;
 
501
                        }
 
502
                        ptr++;
 
503
                        i++;
 
504
                }
 
505
        }
 
506
        xt_sl_set_size(tab->tab_rec_dw_writes, 0);
 
507
        tab->tab_rec_dw_data_usage = 0;
 
508
        tab->tab_head_op_seq = tab->tab_rec_dw_op_seq;
 
509
        if (lock)
 
510
                xt_sl_unlock_ns(tab->tab_rec_dw_writes);
 
511
        return OK;
 
512
 
 
513
        failed:
 
514
        if (lock)
 
515
                xt_sl_unlock_ns(tab->tab_rec_dw_writes);
 
516
        return FAILED;
 
517
}
 
518
 
 
519
xtPublic void xt_xres_flush_all(XTThreadPtr self, XTWriterStatePtr ws)
 
520
{
 
521
        xtTableID               tab_id;
 
522
        XTOpenTablePtr  ot;
 
523
 
 
524
        for (int i=0; i<XT_TABLE_LIST_SIZE; i++) {
 
525
                if ((tab_id = ws->ws_tab_flush_list[i])) {
 
526
                        if ((ot = ws->ws_ot) && ot->ot_table->tab_id == tab_id) {
 
527
                                if (!xt_xres_delay_flush(ot, TRUE))
 
528
                                        xt_throw(self);
 
529
                        }
 
530
                        else {
 
531
                                if ((ot = xt_db_open_pool_table(self, ws->ws_db, tab_id, NULL, TRUE))) {
 
532
                                        if (!xt_xres_delay_flush(ot, TRUE))
 
533
                                                xt_throw(self);
 
534
                                        xt_db_return_table_to_pool(self, ot);
 
535
                                }
 
536
                        }
 
537
                        ws->ws_tab_flush_list[i] = 0;
 
538
                }
 
539
        }
 
540
}
 
541
 
 
542
static void xres_table_to_flush(XTThreadPtr self, XTWriterStatePtr ws, xtTableID tab_id)
 
543
{
 
544
        u_int idx;
 
545
 
 
546
        retry:
 
547
        idx = tab_id % XT_TABLE_LIST_SIZE;
 
548
        if (ws->ws_tab_flush_list[idx] == tab_id)
 
549
                return;
 
550
        if (!ws->ws_tab_flush_list[idx])
 
551
                goto add_it;
 
552
        idx = (idx + XT_TABLE_LIST_INC) % XT_TABLE_LIST_SIZE;
 
553
        if (ws->ws_tab_flush_list[idx] == tab_id)
 
554
                return;
 
555
        if (!ws->ws_tab_flush_list[idx])
 
556
                goto add_it;
 
557
        idx = (idx + XT_TABLE_LIST_INC) % XT_TABLE_LIST_SIZE;
 
558
        if (ws->ws_tab_flush_list[idx] == tab_id)
 
559
                return;
 
560
        if (!ws->ws_tab_flush_list[idx])
 
561
                goto add_it;
 
562
        idx = (idx + XT_TABLE_LIST_INC) % XT_TABLE_LIST_SIZE;
 
563
        if (ws->ws_tab_flush_list[idx] == tab_id)
 
564
                return;
 
565
        if (!ws->ws_tab_flush_list[idx])
 
566
                goto add_it;
 
567
 
 
568
        /* After 4 tries we consider the table too full: */
 
569
        xt_xres_flush_all(self, ws);
 
570
        goto retry;
 
571
 
 
572
        add_it:
 
573
        ws->ws_tab_flush_list[idx] = tab_id;
 
574
}
 
575
 
 
576
static void xres_delay_write(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, size_t offs, size_t size, xtWord1 *data, xtBool in_sequence, XTWriterStatePtr ws)
 
577
{
 
578
        XTTableHPtr tab = ot->ot_table;
 
579
 
 
580
        ASSERT_NS(offs <= 0xFFFF);
 
581
        ASSERT_NS(size <= 0xFFFF);
 
582
        if (in_sequence && tab->tab_rec_dw_writes) {
 
583
                size_t                  idx;
 
584
                XTDelayWriteRec dw, *dw_ptr;
 
585
                size_t                  part_size;
 
586
 
 
587
                xt_sl_lock_ns(tab->tab_rec_dw_writes, ot->ot_thread);
 
588
 
 
589
                if (tab->tab_rec_dw_data_usage > XT_SORT_REC_MAX_BUF_SIZE) {
 
590
                        if (!xt_xres_delay_flush(ot, FALSE))
 
591
                                goto failed;
 
592
                }
 
593
 
 
594
                add_new_rec:
 
595
                if ((dw_ptr = (XTDelayWritePtr) xt_sl_search(self, &idx, tab->tab_rec_dw_writes, &rec_id))) {
 
596
                        /* Record has already been written: */
 
597
                        if (offs >= dw_ptr->dw_offset) {
 
598
                                if (offs + size <= dw_ptr->dw_offset + dw_ptr->dw_size)
 
599
                                        /* Completely in old block: */
 
600
                                        memcpy(tab->tab_rec_dw_data + dw_ptr->dw_data + (offs - dw_ptr->dw_offset), data, size);
 
601
                                else {
 
602
                                        if (offs > dw_ptr->dw_offset + dw_ptr->dw_size) {
 
603
                                                /* Records do not overlap (cannot handle this situation)! */
 
604
                                                if (!xt_xres_delay_flush(ot, FALSE))
 
605
                                                        goto failed;
 
606
                                                goto add_new_rec;
 
607
                                        }
 
608
 
 
609
                                        /* The current record overlaps, and follows on: */
 
610
                                        part_size = offs - dw_ptr->dw_offset;
 
611
                                        
 
612
                                        if (tab->tab_rec_dw_data_usage + part_size + size > tab->tab_rec_dw_data_size) {
 
613
                                                if (!xt_realloc_ns((void **) &tab->tab_rec_dw_data, tab->tab_rec_dw_data_usage + part_size + size))
 
614
                                                        goto failed;
 
615
                                                tab->tab_rec_dw_data_size = tab->tab_rec_dw_data_usage + part_size + size;
 
616
                                        }
 
617
                                        
 
618
                                        memcpy(tab->tab_rec_dw_data + tab->tab_rec_dw_data_usage, tab->tab_rec_dw_data + dw_ptr->dw_data, part_size);
 
619
                                        memcpy(tab->tab_rec_dw_data + tab->tab_rec_dw_data_usage + part_size, data, size);
 
620
                                        dw_ptr->dw_size = (xtWord2) (part_size + size);
 
621
                                        dw_ptr->dw_data = tab->tab_rec_dw_data_usage;
 
622
                                        tab->tab_rec_dw_data_usage += part_size + size;
 
623
                                }
 
624
                        }
 
625
                        else {
 
626
                                if (offs + size >= dw_ptr->dw_offset + dw_ptr->dw_size) {
 
627
                                        /* Completely covers old update (and is bigger): */
 
628
                                        if (tab->tab_rec_dw_data_usage + size > tab->tab_rec_dw_data_size) {
 
629
                                                if (!xt_realloc_ns((void **) &tab->tab_rec_dw_data, tab->tab_rec_dw_data_usage + size))
 
630
                                                        goto failed;
 
631
                                                tab->tab_rec_dw_data_size = tab->tab_rec_dw_data_usage + size;
 
632
                                        }
 
633
                                        memcpy(&tab->tab_rec_dw_data[tab->tab_rec_dw_data_usage], data, size);
 
634
                                        dw_ptr->dw_offset = (xtWord2) offs;
 
635
                                        dw_ptr->dw_size = (xtWord2) size;
 
636
                                        dw_ptr->dw_data = tab->tab_rec_dw_data_usage;
 
637
                                        tab->tab_rec_dw_data_usage += size;
 
638
                                }
 
639
                                else {
 
640
                                        if (offs + size < dw_ptr->dw_offset) {
 
641
                                                /* Records do not overlap (cannot handle this situation)! */
 
642
                                                if (!xt_xres_delay_flush(ot, FALSE))
 
643
                                                        goto failed;
 
644
                                                goto add_new_rec;
 
645
                                        }
 
646
 
 
647
                                        /* The part in the record we want to keep: */
 
648
                                        part_size = dw_ptr->dw_offset + dw_ptr->dw_size - (offs + size);
 
649
 
 
650
                                        if (tab->tab_rec_dw_data_usage + part_size + size > tab->tab_rec_dw_data_size) {
 
651
                                                if (!xt_realloc_ns((void **) &tab->tab_rec_dw_data, tab->tab_rec_dw_data_usage + part_size + size))
 
652
                                                        goto failed;
 
653
                                                tab->tab_rec_dw_data_size = tab->tab_rec_dw_data_usage + part_size + size;
 
654
                                        }
 
655
 
 
656
                                        memcpy(tab->tab_rec_dw_data + tab->tab_rec_dw_data_usage, data, size);
 
657
                                        memcpy(tab->tab_rec_dw_data + tab->tab_rec_dw_data_usage + size, tab->tab_rec_dw_data + dw_ptr->dw_data + dw_ptr->dw_size - part_size, part_size);
 
658
                                        dw_ptr->dw_offset = (xtWord2) offs;
 
659
                                        dw_ptr->dw_size = (xtWord2) (part_size + size);
 
660
                                        dw_ptr->dw_data = tab->tab_rec_dw_data_usage;
 
661
                                        tab->tab_rec_dw_data_usage += part_size + size;
 
662
                                }
 
663
                        }
 
664
                        
 
665
                        goto done_add;
 
666
                }
 
667
 
 
668
                /* New record added, make space for at least 3 records! */
 
669
                size_t inc_size;
 
670
 
 
671
                inc_size = tab->tab_dic.dic_rec_size * 3;
 
672
                ASSERT(size*3 <= inc_size);
 
673
                if (tab->tab_rec_dw_data_usage + inc_size > tab->tab_rec_dw_data_size) {
 
674
                        if (!xt_realloc_ns((void **) &tab->tab_rec_dw_data, tab->tab_rec_dw_data_usage + (inc_size * 4)))
 
675
                                goto failed;
 
676
                        tab->tab_rec_dw_data_size = tab->tab_rec_dw_data_usage + (inc_size * 4);
 
677
                }
 
678
 
 
679
                if (offs == 0) {
 
680
                        if (idx > 0) {
 
681
                                /* This record follows on after another */
 
682
                                dw_ptr = (XTDelayWritePtr) xt_sl_item_at(tab->tab_rec_dw_writes, idx-1);
 
683
                                
 
684
                                /* Check if the record before can be expanded! */
 
685
                                if (dw_ptr->dw_rec_id+1 == rec_id &&
 
686
                                        dw_ptr->dw_offset == 0 &&
 
687
                                        dw_ptr->dw_size >= XT_REC_EXT_HEADER_SIZE &&
 
688
                                        dw_ptr->dw_size < tab->tab_dic.dic_rec_size &&
 
689
                                        dw_ptr->dw_data + dw_ptr->dw_size == tab->tab_rec_dw_data_usage) {
 
690
                                        /* Round the size up... */
 
691
                                        tab->tab_rec_dw_data_usage = dw_ptr->dw_data + tab->tab_dic.dic_rec_size;
 
692
                                }
 
693
                        }
 
694
                }
 
695
 
 
696
                memcpy(&tab->tab_rec_dw_data[tab->tab_rec_dw_data_usage], data, size);
 
697
                dw.dw_rec_id = rec_id;
 
698
                dw.dw_offset = (xtWord2) offs;
 
699
                dw.dw_size = (xtWord2) size;
 
700
                dw.dw_data = tab->tab_rec_dw_data_usage;
 
701
                xt_sl_insert_item_at(self, tab->tab_rec_dw_writes, idx, &dw);
 
702
                tab->tab_rec_dw_data_usage += size;
 
703
 
 
704
                done_add:
 
705
                xt_sl_unlock_ns(tab->tab_rec_dw_writes);
 
706
 
 
707
                /* Add the table to the table list! */
 
708
                return xres_table_to_flush(self, ws, tab->tab_id);
 
709
        }
 
710
 
 
711
        if (xt_sl_get_size(tab->tab_rec_dw_writes)) {
 
712
                if (!xt_xres_delay_flush(ot, TRUE))
 
713
                        xt_throw(self);
 
714
        }
 
715
 
 
716
        if (!xt_tab_write_rec(ot, xt_rec_id_to_rec_offset(tab, rec_id) + offs, size, data))
 
717
                xt_throw(self);
 
718
        return;
 
719
 
 
720
        failed:
 
721
        xt_sl_unlock_ns(tab->tab_rec_dw_writes);
 
722
        xt_throw(self);
 
723
}
 
724
 
 
725
#define WRITE_REC_FILE(self, a, rec_id, off, c, d, e, ws)       xres_delay_write(self, a, rec_id, off, c, d, e, ws)
 
726
 
 
727
#else
 
728
 
 
729
static void xres_write_rec(XTThreadPtr self, XTOpenTablePtr ot, off_t offset, size_t size, xtWord1 *data)
 
730
{
 
731
        if (!xt_tab_write_rec(ot, offset, size, data))
 
732
                xt_throw(self);
 
733
}
 
734
 
 
735
#define WRITE_REC_FILE(self, a, rec_id, off, c, d, e, sw)       xres_write_rec(self, a, xt_rec_id_to_rec_offset(tab, rec_id) + (off), c, d)
 
736
 
 
737
#endif
 
738
 
 
739
/* ----------------------------------------------------------------------
 
740
 * APPLYING CHANGES IN SEQUENCE
 
741
 */
 
742
 
 
743
typedef struct XTOperation {
 
744
        xtOpSeqNo                               or_op_seq;
 
745
        xtWord4                                 or_op_len;
 
746
        xtLogID                                 or_log_id;
 
747
        xtLogOffset                             or_log_offset;
 
748
} XTOperationRec, *XTOperationPtr;
 
749
 
 
750
static int xres_cmp_op_seq(struct XTThread *XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
 
751
{
 
752
        xtOpSeqNo               lf_op_seq = *((xtOpSeqNo *) a);
 
753
        XTOperationPtr  lf_ptr = (XTOperationPtr) b;
 
754
 
 
755
        if (lf_op_seq == lf_ptr->or_op_seq)
 
756
                return 0;
 
757
        if (XTTableSeq::xt_op_is_before(lf_op_seq, lf_ptr->or_op_seq))
 
758
                return -1;
 
759
        return 1;
 
760
}
 
761
 
 
762
xtPublic void xt_xres_init_tab(XTThreadPtr self, XTTableHPtr tab)
 
763
{
 
764
        tab->tab_op_list = xt_new_sortedlist(self, sizeof(XTOperationRec), 20, 1000, xres_cmp_op_seq, NULL, NULL, TRUE, FALSE);
 
765
}
 
766
 
 
767
xtPublic void xt_xres_exit_tab(XTThreadPtr self, XTTableHPtr tab)
 
768
{
 
769
        if (tab->tab_op_list) {
 
770
                xt_free_sortedlist(self, tab->tab_op_list);
 
771
                tab->tab_op_list = NULL;
 
772
        }
 
773
}
 
774
 
 
775
static xtBool xres_open_table(XTThreadPtr self, XTWriterStatePtr ws, xtTableID tab_id)
 
776
{
 
777
        XTOpenTablePtr  ot;
 
778
 
 
779
        if ((ot = ws->ws_ot)) {
 
780
                if (ot->ot_table->tab_id == tab_id)
 
781
                        return OK;
 
782
                xt_db_return_table_to_pool(self, ot);
 
783
                ws->ws_ot = NULL;
 
784
        }
 
785
 
 
786
        /* Don't recover tables that cannot be found, or tables that 
 
787
         * temporary:
 
788
         */
 
789
        if (ws->ws_tab_gone == tab_id || ws->ws_tab_temp == tab_id)
 
790
                return FAILED;
 
791
 
 
792
        if ((ws->ws_ot = xt_db_open_pool_table(self, ws->ws_db, tab_id, NULL, TRUE))) {
 
793
                XTTableHPtr             tab;
 
794
 
 
795
                tab = ws->ws_ot->ot_table;
 
796
                
 
797
                /* Don't recover temporary tables! */
 
798
                if (ws->ws_in_recover && XT_IS_TEMP_TABLE(tab->tab_dic.dic_tab_flags)) {
 
799
                        tab->tab_recovery_not_done = TRUE;
 
800
                        tab->tab_dic.dic_disable_index = XT_INDEX_NOT_RECOVERED;
 
801
                        xt_db_return_table_to_pool(self, ws->ws_ot);
 
802
                        ws->ws_ot = NULL;
 
803
                        ws->ws_tab_temp = tab_id;
 
804
                        return FAILED;
 
805
                }
 
806
 
 
807
                if (!tab->tab_ind_rec_log_id) {
 
808
                        /* Should not happen... */
 
809
                        tab->tab_ind_rec_log_id = ws->ws_ind_rec_log_id;
 
810
                        tab->tab_ind_rec_log_offset = ws->ws_ind_rec_log_offset;
 
811
                }
 
812
                return OK;
 
813
        }
 
814
 
 
815
        ws->ws_tab_gone = tab_id;
 
816
        return FAILED;
 
817
}
 
818
 
 
819
/* {INDEX-RECOV_ROWID}
 
820
 * Add missing index entries during recovery.
 
821
 * Set the row ID even if the index entry
 
822
 * is not committed. It will be removed later by
 
823
 * the sweeper.
 
824
 */
 
825
static xtBool xres_add_index_entries(XTOpenTablePtr ot, xtRowID row_id, xtRecordID rec_id, xtWord1 *rec_data)
 
826
{
 
827
        XTTableHPtr                     tab = ot->ot_table;
 
828
        u_int                           idx_cnt;
 
829
        XTIndexPtr                      *ind;
 
830
        //XTIdxSearchKeyRec     key;
 
831
 
 
832
        if (tab->tab_dic.dic_disable_index)
 
833
                return OK;
 
834
 
 
835
        for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
 
836
                if (!xt_idx_insert(ot, *ind, row_id, rec_id, rec_data, NULL, TRUE)) {
 
837
                        /* Check the error, certain errors are recoverable! */
 
838
                        XTThreadPtr self = xt_get_self();
 
839
 
 
840
                        if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
 
841
                                (XT_FILE_IN_USE(self->t_exception.e_sys_err) ||
 
842
                                 XT_FILE_ACCESS_DENIED(self->t_exception.e_sys_err) ||
 
843
                                 XT_FILE_TOO_MANY_OPEN(self->t_exception.e_sys_err) ||
 
844
                                 self->t_exception.e_sys_err == XT_ENOMEM)) {
 
845
                                ot->ot_err_index_no = (*ind)->mi_index_no;
 
846
                                return FAILED;
 
847
                        }
 
848
 
 
849
                        /* TODO: Write something to the index header to indicate that
 
850
                         * it is corrupted.
 
851
                         */
 
852
                        xt_tab_disable_index(ot->ot_table, XT_INDEX_CORRUPTED);
 
853
                        xt_log_and_clear_exception_ns();
 
854
                        return OK;
 
855
                }
 
856
        }
 
857
        return OK;
 
858
}
 
859
 
 
860
static void xres_remove_index_entries(XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *rec_data)
 
861
{
 
862
        XTTableHPtr     tab = ot->ot_table;
 
863
        u_int           idx_cnt;
 
864
        XTIndexPtr      *ind;
 
865
 
 
866
        if (tab->tab_dic.dic_disable_index)
 
867
                return;
 
868
 
 
869
        for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
 
870
                if (!xt_idx_delete(ot, *ind, rec_id, rec_data))
 
871
                        xt_log_and_clear_exception_ns();
 
872
        }
 
873
}
 
874
 
 
875
static xtWord1 *xres_load_record(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *data, size_t red_size, XTInfoBufferPtr rec_buf, u_int cols_req)
 
876
{
 
877
        XTTableHPtr     tab = ot->ot_table;
 
878
        xtWord1         *rec_data;
 
879
 
 
880
        rec_data = ot->ot_row_rbuffer;
 
881
 
 
882
        ASSERT(red_size <= ot->ot_row_rbuf_size);
 
883
        ASSERT(tab->tab_dic.dic_rec_size <= ot->ot_row_rbuf_size);
 
884
        if (data) {
 
885
                if (rec_data != data)
 
886
                        memcpy(rec_data, data, red_size);
 
887
        }
 
888
        else {
 
889
                /* It can be that less than 'dic_rec_size' was written for
 
890
                 * variable length type records.
 
891
                 * If this is the last record in the file, then we will read
 
892
                 * less than actual record size.
 
893
                 */
 
894
                if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, rec_data, &red_size, &self->st_statistics.st_rec, self))
 
895
                        goto failed;
 
896
                
 
897
                if (red_size < sizeof(XTTabRecHeadDRec))
 
898
                        return NULL;
 
899
        }
 
900
        
 
901
        if (XT_REC_IS_FIXED(rec_data[0]))
 
902
                rec_data = ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE;
 
903
        else {
 
904
                if (!xt_ib_alloc(NULL, rec_buf, tab->tab_dic.dic_mysql_buf_size))
 
905
                        goto failed;
 
906
                if (XT_REC_IS_VARIABLE(rec_data[0])) {
 
907
                        if (!myxt_load_row(ot, rec_data + XT_REC_FIX_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
 
908
                                goto failed;
 
909
                }
 
910
                else if (XT_REC_IS_EXT_DLOG(rec_data[0])) {
 
911
                        if (red_size < XT_REC_EXT_HEADER_SIZE)
 
912
                                return NULL;
 
913
 
 
914
                        ASSERT(cols_req);
 
915
                        if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
 
916
                                if (!myxt_load_row(ot, rec_data + XT_REC_EXT_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
 
917
                                        goto failed;
 
918
                        }
 
919
                        else {
 
920
                                if (!xt_tab_load_ext_data(ot, rec_id, rec_buf->ib_db.db_data, cols_req))
 
921
                                        goto failed;
 
922
                        }
 
923
                }
 
924
                else
 
925
                        /* This is possible, the record has already been cleaned up. */
 
926
                        return NULL;
 
927
                rec_data = rec_buf->ib_db.db_data;
 
928
        }
 
929
 
 
930
        return rec_data;
 
931
 
 
932
        failed:
 
933
        /* Running out of memory should not be ignored. */
 
934
        if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
 
935
                self->t_exception.e_sys_err == XT_ENOMEM)
 
936
                xt_throw(self);
 
937
        xt_log_and_clear_exception_ns();
 
938
        return NULL;
 
939
}
 
940
 
 
941
/*
 
942
 * Apply a change from the log.
 
943
 *
 
944
 * This function is basically very straight forward, were it not
 
945
 * for the option to apply operations out of sequence.
 
946
 * (i.e. in_sequence == FALSE)
 
947
 *
 
948
 * If operations are applied in sequence, then they can be
 
949
 * applied blindly. The update operation is just executed as
 
950
 * it was logged.
 
951
 *
 
952
 * If the changes are not in sequence, then some operation are missing,
 
953
 * however, the operations that are present are in the correct order.
 
954
 *
 
955
 * This can only happen at the end of recovery!!!
 
956
 * After we have applied all operations in the log we may be
 
957
 * left with some operations that have not been applied
 
958
 * because operations were logged out of sequence.
 
959
 *
 
960
 * The application of these operations there has to take into
 
961
 * account the current state of the database.
 
962
 * They are then applied in a manner that maintains the
 
963
 * database consistency.
 
964
 *
 
965
 * For example, a record that is freed, is free by placing it
 
966
 * on the current free list. Part of the data logged for the
 
967
 * operation is ignored. Namely: the "next block" pointer
 
968
 * that was originally written into the freed record.
 
969
 */
 
970
static void xres_apply_change(XTThreadPtr self, XTWriterStatePtr ws, XTXactLogBufferDPtr record, xtBool in_sequence, xtBool check_index, xtOpSeqNo op_seq)
 
971
{
 
972
        XTOpenTablePtr          ot = ws->ws_ot;
 
973
        XTInfoBufferPtr         rec_buf = &ws->ws_rec_buf;
 
974
        XTTableHPtr                     tab = ot->ot_table;
 
975
        size_t                          len;
 
976
        xtRecordID                      rec_id;
 
977
        xtRefID                         free_ref_id;
 
978
        XTTabRecFreeDRec        free_rec;
 
979
        xtRowID                         row_id;
 
980
        XTTabRowRefDRec         row_buf;
 
981
        XTTabRecHeadDRec        rec_head;
 
982
        size_t                          tfer;
 
983
        xtRecordID                      link_rec_id, prev_link_rec_id;
 
984
        xtWord1                         *rec_data = NULL;
 
985
        XTTabRecFreeDPtr        free_data;
 
986
        u_int                           status;
 
987
 
 
988
        ASSERT(ot->ot_thread == self);
 
989
        if (tab->tab_dic.dic_key_count == 0)
 
990
                check_index = FALSE;
 
991
 
 
992
        status = record->xl.xl_status_1;
 
993
        switch (status) {
 
994
                case XT_LOG_ENT_REC_MODIFIED:
 
995
                case XT_LOG_ENT_UPDATE:
 
996
                case XT_LOG_ENT_INSERT:
 
997
                case XT_LOG_ENT_DELETE:
 
998
                case XT_LOG_ENT_UPDATE_BG:
 
999
                case XT_LOG_ENT_INSERT_BG:
 
1000
                case XT_LOG_ENT_DELETE_BG:
 
1001
                        rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
 
1002
 
 
1003
                        /* This should be done before we apply change to table, as otherwise we lose
 
1004
                         * the key value that we need to remove from index
 
1005
                         */
 
1006
                        if (check_index && status == XT_LOG_ENT_REC_MODIFIED) {
 
1007
                                if ((rec_data = xres_load_record(self, ot, rec_id, NULL, 0, rec_buf, tab->tab_dic.dic_ind_cols_req)))
 
1008
                                        xres_remove_index_entries(ot, rec_id, rec_data);                        
 
1009
                        }
 
1010
 
 
1011
                        len = (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
 
1012
                        WRITE_REC_FILE(self, ot, rec_id, 0, len, (xtWord1 *) &record->xu.xu_rec_type_1, in_sequence, ws);
 
1013
                        tab->tab_bytes_to_flush += len;
 
1014
 
 
1015
                        if (check_index) {
 
1016
                                switch (status) {
 
1017
                                        case XT_LOG_ENT_DELETE:
 
1018
                                        case XT_LOG_ENT_DELETE_BG:
 
1019
                                                break;
 
1020
                                        default:
 
1021
                                                if ((rec_data = xres_load_record(self, ot, rec_id, &record->xu.xu_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {
 
1022
                                                        row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
 
1023
                                                        if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))
 
1024
                                                                xt_throw(self);
 
1025
                                                }
 
1026
                                                break;
 
1027
                                }
 
1028
                        }
 
1029
 
 
1030
                        if (!in_sequence) {
 
1031
                                /* A record has been allocated from the EOF, but out of sequence.
 
1032
                                 * This could leave a gap where other records were allocated
 
1033
                                 * from the EOF, but those operations have been lost!
 
1034
                                 * We compensate for this by adding all blocks between
 
1035
                                 * to the free list.
 
1036
                                 */
 
1037
                                free_rec.rf_rec_type_1 = XT_TAB_STATUS_FREED;
 
1038
                                free_rec.rf_not_used_1 = 0;
 
1039
                                while (tab->tab_head_rec_eof_id < rec_id) {
 
1040
                                        XT_SET_DISK_4(free_rec.rf_next_rec_id_4, tab->tab_head_rec_free_id);
 
1041
                                        if (!xt_tab_write_rec(ot, tab->tab_head_rec_eof_id, sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec))
 
1042
                                                xt_throw(self);
 
1043
                                        tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
 
1044
                                        tab->tab_head_rec_free_id = tab->tab_head_rec_eof_id;
 
1045
                                        tab->tab_head_rec_eof_id++;
 
1046
                                }
 
1047
                        }
 
1048
                        if (tab->tab_head_rec_eof_id < rec_id + 1)
 
1049
                                tab->tab_head_rec_eof_id = rec_id + 1;
 
1050
                        tab->tab_flush_pending = TRUE;
 
1051
                        break;
 
1052
                case XT_LOG_ENT_UPDATE_FL:
 
1053
                case XT_LOG_ENT_INSERT_FL:
 
1054
                case XT_LOG_ENT_DELETE_FL:
 
1055
                case XT_LOG_ENT_UPDATE_FL_BG:
 
1056
                case XT_LOG_ENT_INSERT_FL_BG:
 
1057
                case XT_LOG_ENT_DELETE_FL_BG:
 
1058
                        rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
 
1059
                        len = (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
 
1060
                        free_ref_id = XT_GET_DISK_4(record->xf.xf_free_rec_id_4);
 
1061
 
 
1062
                        if (check_index &&
 
1063
                                record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL &&
 
1064
                                record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL_BG) {
 
1065
                                if ((rec_data = xres_load_record(self, ot, rec_id, &record->xf.xf_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {
 
1066
                                        row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
 
1067
                                        if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))
 
1068
                                                xt_throw(self);
 
1069
                                }
 
1070
                        }
 
1071
 
 
1072
                        if (!in_sequence) {
 
1073
                                /* This record was allocated from the free list.
 
1074
                                 * Because this operation is out of sequence, there
 
1075
                                 * could have been other allocations from the
 
1076
                                 * free list before this, that have gone missing.
 
1077
                                 * For this reason we have to search the current
 
1078
                                 * free list and remove the record.
 
1079
                                 */
 
1080
                                link_rec_id = tab->tab_head_rec_free_id;
 
1081
                                prev_link_rec_id = 0;
 
1082
                                while (link_rec_id) {
 
1083
                                        if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecFreeDRec), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, NULL, &self->st_statistics.st_rec, self))
 
1084
                                                xt_throw(self);
 
1085
                                        if (link_rec_id == rec_id)
 
1086
                                                break;
 
1087
                                        prev_link_rec_id = link_rec_id;
 
1088
                                        link_rec_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);
 
1089
                                }
 
1090
                                if (link_rec_id == rec_id) {
 
1091
                                        /* The block was found on the free list.
 
1092
                                         * remove it: */
 
1093
                                        if (prev_link_rec_id) {
 
1094
                                                /* We write the record from position 'link_rec_id' into
 
1095
                                                 * position 'prev_link_rec_id'. This unlinks 'link_rec_id'!
 
1096
                                                 */
 
1097
                                                if (!xt_tab_write_rec(ot, xt_rec_id_to_rec_offset(tab, prev_link_rec_id), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec))
 
1098
                                                        xt_throw(self);
 
1099
                                                tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
 
1100
                                                free_ref_id = tab->tab_head_rec_free_id;
 
1101
                                        }
 
1102
                                        else
 
1103
                                                /* The block is at the front of the list: */
 
1104
                                                free_ref_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);
 
1105
                                }
 
1106
                                else {
 
1107
                                        /* Not found on the free list? */
 
1108
                                        if (tab->tab_head_rec_eof_id < rec_id + 1)
 
1109
                                                tab->tab_head_rec_eof_id = rec_id + 1;
 
1110
                                        goto write_mod_data;
 
1111
                                }
 
1112
                        }
 
1113
                        if (tab->tab_head_rec_eof_id < rec_id + 1)
 
1114
                                tab->tab_head_rec_eof_id = rec_id + 1;
 
1115
                        tab->tab_head_rec_free_id = free_ref_id;
 
1116
                        tab->tab_head_rec_fnum--;
 
1117
                        write_mod_data:
 
1118
                        WRITE_REC_FILE(self, ot, rec_id, 0, len, (xtWord1 *) &record->xf.xf_rec_type_1, in_sequence, ws);
 
1119
                        tab->tab_bytes_to_flush += len;
 
1120
                        tab->tab_flush_pending = TRUE;
 
1121
                        break;
 
1122
                case XT_DEFUNKT_REC_REMOVED:
 
1123
                case XT_DEFUNKT_REC_REMOVED_EXT: {
 
1124
                        xtBool                  record_loaded;
 
1125
                        XTTabRecExtDPtr ext_rec;
 
1126
                        size_t                  red_size;
 
1127
                        xtWord4                 log_over_size = 0;
 
1128
                        xtLogID                 data_log_id = 0;
 
1129
                        xtLogOffset             data_log_offset = 0;
 
1130
                        u_int                   cols_required = 0;
 
1131
 
 
1132
                        rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
 
1133
                        free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;
 
1134
 
 
1135
                        /* This is a short-cut, it does not require loading the record: */
 
1136
                        if (!check_index && !tab->tab_dic.dic_blob_count && record->fr.fr_status_1 != XT_DEFUNKT_REC_REMOVED_EXT)
 
1137
                                goto do_rec_freed;
 
1138
 
 
1139
                        ext_rec = (XTTabRecExtDPtr) ot->ot_row_rbuffer;
 
1140
 
 
1141
                        if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, ext_rec, &red_size, &self->st_statistics.st_rec, self)) {
 
1142
                                xt_log_and_clear_exception_ns();
 
1143
                                goto do_rec_freed;
 
1144
                        }
 
1145
 
 
1146
                        if (red_size < sizeof(XTTabRecHeadDRec))
 
1147
                                goto do_rec_freed;
 
1148
 
 
1149
                        /* Check that the record is the same as the one originally removed.
 
1150
                         * This can be different if recovery is repeated.
 
1151
                         * For example:
 
1152
                         * 
 
1153
                         * log=21 offset=6304472 REMOVED-X REC op=360616 tab=7 rec=25874 
 
1154
                         * log=21 offset=6309230 UPDATE-FL op=360618 tab=7 rec=25874 row=26667 log=1 offset=26503077 xact=209 
 
1155
                         * log=21 offset=6317500 CLEAN REC op=360631 tab=7 rec=25874 
 
1156
                         * 
 
1157
                         * If this recovery sequence is repeated, then the REMOVED-X will free the
 
1158
                         * extended record belonging to the update that came afterwards!
 
1159
                         *
 
1160
                         * Additional situation to consider:
 
1161
                         *
 
1162
                         * - A record "x" is created, and index entries created.
 
1163
                         * - A checkpoint is made done.
 
1164
                         * - Record "x" is deleted due to UPDATE.
 
1165
                         * - The index entries are removed, but the index is not
 
1166
                         *   flushed.
 
1167
                         * - This deletion is written to disk by the writer.
 
1168
                         * So we have the situation that the remove is on disk,
 
1169
                         * but the index changes have not been made.
 
1170
                         *
 
1171
                         * In this case, skipping to "do_rec_freed" is incorrect.
 
1172
                         */
 
1173
                        if (record->fr.fr_stat_id_1 != ext_rec->tr_stat_id_1 ||
 
1174
                                XT_GET_DISK_4(record->fr.fr_xact_id_4) != XT_GET_DISK_4(ext_rec->tr_xact_id_4))
 
1175
                                goto dont_remove_x_record;
 
1176
 
 
1177
                        if (status == XT_DEFUNKT_REC_REMOVED_EXT) {
 
1178
                                if (!XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1))
 
1179
                                        goto dont_remove_x_record;
 
1180
                                if (red_size < offsetof(XTTabRecExtDRec, re_data))
 
1181
                                        goto dont_remove_x_record;
 
1182
 
 
1183
                                /* Save this for later (can be overwritten by xres_load_record(): */
 
1184
                                data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
 
1185
                                data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
 
1186
                                log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
 
1187
                        }
 
1188
                        dont_remove_x_record:
 
1189
 
 
1190
                        record_loaded = FALSE;
 
1191
 
 
1192
                        if (check_index) {
 
1193
                                cols_required = tab->tab_dic.dic_ind_cols_req;
 
1194
                                if (tab->tab_dic.dic_blob_cols_req > cols_required)
 
1195
                                        cols_required = tab->tab_dic.dic_blob_cols_req;
 
1196
                                if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))
 
1197
                                        goto do_rec_freed;
 
1198
                                record_loaded = TRUE;
 
1199
                                xres_remove_index_entries(ot, rec_id, rec_data);
 
1200
                        }
 
1201
 
 
1202
                        if (tab->tab_dic.dic_blob_count) {
 
1203
                                if (!record_loaded) {
 
1204
                                        if (tab->tab_dic.dic_blob_cols_req > cols_required)
 
1205
                                                cols_required = tab->tab_dic.dic_blob_cols_req;
 
1206
                                        if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))
 
1207
                                                /* [(7)] REMOVE is followed by FREE:
 
1208
                                                goto get_rec_offset;
 
1209
                                                */
 
1210
                                                goto do_rec_freed;
 
1211
                                        record_loaded = TRUE;
 
1212
                                }
 
1213
                        }
 
1214
 
 
1215
                        if (status == XT_DEFUNKT_REC_REMOVED_EXT) {
 
1216
                                /* Note: dlb_delete_log() may be repeated, but should handle this:
 
1217
                                 * 
 
1218
                                 * Example:
 
1219
                                 * log=5 offset=213334 CLEAN REC op=28175 tab=1 rec=317428 
 
1220
                                 * ...
 
1221
                                 * log=6 offset=321063 REMOVED-X REC op=33878 tab=1 rec=317428 
 
1222
                                 *
 
1223
                                 * When this sequence is repeated during recovery, then CLEAN REC
 
1224
                                 * will reset the status byte of the record so that it
 
1225
                                 * comes back to here!
 
1226
                                 *
 
1227
                                 * The check for zero is probably not required here.
 
1228
                                 */
 
1229
                                if (data_log_id && data_log_offset && log_over_size) {
 
1230
                                        if (ot->ot_table->tab_dic.dic_tab_flags & XT_TF_MEMORY_TABLE)
 
1231
                                                xt_tab_free_ext_slot(tab, data_log_id, data_log_offset, log_over_size);
 
1232
                                        else {
 
1233
                                                if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {
 
1234
                                                        if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
 
1235
                                                                ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
 
1236
                                                                xt_log_and_clear_exception_ns();
 
1237
                                                }
 
1238
                                        }
 
1239
                                }
 
1240
                        }
 
1241
 
 
1242
                        goto do_rec_freed;
 
1243
                }
 
1244
                case XT_LOG_ENT_REC_REMOVED_BI:
 
1245
                case XT_LOG_ENT_REC_REMOVED_BI_L: 
 
1246
                {
 
1247
                        /*
 
1248
                         * For deletion we need the complete before image because of the following problem.
 
1249
                         *
 
1250
                         * DROP TABLE IF EXISTS t1;
 
1251
                         * CREATE TABLE t1 (ID int primary key auto_increment, value int, index (value)) engine=pbxt;
 
1252
                         * 
 
1253
                         * insert t1(value) values(50);
 
1254
                         * 
 
1255
                         * -- CHECKPOINT --
 
1256
                         * 
 
1257
                         * update t1 set value = 60;
 
1258
                         * 
 
1259
                         * -- PAUSE --
 
1260
                         * 
 
1261
                         * update t1 set value = 70;
 
1262
                         * 
 
1263
                         * -- CRASH --
 
1264
                         * 
 
1265
                         * select value from t1;
 
1266
                         * select * from t1;
 
1267
                         * 
 
1268
                         * 081203 12:11:46 [Note] PBXT: Recovering from 1-148, bytes to read: 33554284
 
1269
                         * log=1 offset=148 UPDATE-BG op=5 tab=1 rec=2 row=1 xact=3 
 
1270
                         * log=1 offset=188 REC ADD ROW op=6 tab=1 row=1 
 
1271
                         * log=1 offset=206 COMMIT xact=3 
 
1272
                         * log=1 offset=216 REMOVED REC op=7 tab=1 rec=1 xact=2 
 
1273
                         * log=1 offset=241 CLEAN REC op=8 tab=1 rec=2 
 
1274
                         * log=1 offset=261 CLEANUP xact=3 
 
1275
                         * log=1 offset=267 UPDATE-FL-BG op=9 tab=1 rec=1 row=1 xact=4 
 
1276
                         * log=1 offset=311 REC ADD ROW op=10 tab=1 row=1 
 
1277
                         * log=1 offset=329 COMMIT xact=4 
 
1278
                         * log=1 offset=339 REMOVED REC op=11 tab=1 rec=2 xact=3 
 
1279
                         * log=1 offset=364 CLEAN REC op=12 tab=1 rec=1 
 
1280
                         * log=1 offset=384 CLEANUP xact=4 
 
1281
                         * 081203 12:12:15 [Note] PBXT: Recovering complete at 1-390, bytes read: 33554284
 
1282
                         * 
 
1283
                         * mysql> select value from t1;
 
1284
                         * +-------+
 
1285
                         * | value |
 
1286
                         * +-------+
 
1287
                         * |    50 | 
 
1288
                         * |    70 | 
 
1289
                         * +-------+
 
1290
                         * 2 rows in set (55.99 sec)
 
1291
                         * 
 
1292
                         * mysql> select * from t1;
 
1293
                         * +----+-------+
 
1294
                         * | ID | value |
 
1295
                         * +----+-------+
 
1296
                         * |  1 |    70 | 
 
1297
                         * +----+-------+
 
1298
                         * 1 row in set (0.00 sec)
 
1299
                         */
 
1300
                        XTTabRecExtDPtr ext_rec;
 
1301
                        xtWord4                 log_over_size = 0;
 
1302
                        xtLogID                 data_log_id = 0;
 
1303
                        xtLogOffset             data_log_offset = 0;
 
1304
                        u_int                   cols_required = 0;
 
1305
                        xtBool                  record_loaded;
 
1306
                        size_t                  rec_size;               
 
1307
 
 
1308
                        rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
 
1309
                        rec_size = XT_GET_DISK_2(record->rb.rb_size_2);
 
1310
 
 
1311
                        if (status == XT_LOG_ENT_REC_REMOVED_BI_L)
 
1312
                                ext_rec = (XTTabRecExtDPtr) &record->bl.bl_rec_type_1;
 
1313
                        else
 
1314
                                ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
 
1315
 
 
1316
                        if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
 
1317
                                /* Save this for later (can be overwritten by xres_load_record(): */
 
1318
                                data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
 
1319
                                data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
 
1320
                                log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
 
1321
                        }
 
1322
 
 
1323
                        record_loaded = FALSE;
 
1324
 
 
1325
                        if (check_index) {
 
1326
                                cols_required = tab->tab_dic.dic_ind_cols_req;
 
1327
                                if (!(rec_data = xres_load_record(self, ot, rec_id, (xtWord1 *) ext_rec, rec_size, rec_buf, cols_required)))
 
1328
                                        goto go_on_to_free;
 
1329
                                record_loaded = TRUE;
 
1330
                                xres_remove_index_entries(ot, rec_id, rec_data);
 
1331
                        }
 
1332
 
 
1333
                        if (data_log_id && data_log_offset && log_over_size) {
 
1334
                                if (ot->ot_table->tab_dic.dic_tab_flags & XT_TF_MEMORY_TABLE)
 
1335
                                        xt_tab_free_ext_slot(tab, data_log_id, data_log_offset, log_over_size);
 
1336
                                else {
 
1337
                                        if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {
 
1338
                                                if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
 
1339
                                                        ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
 
1340
                                                        xt_log_and_clear_exception_ns();
 
1341
                                        }
 
1342
                                }
 
1343
                        }
 
1344
 
 
1345
                        go_on_to_free:
 
1346
                        /* Use the new record type: */
 
1347
                        ext_rec->tr_rec_type_1 = record->rb.rb_new_rec_type_1;
 
1348
                        free_data = (XTTabRecFreeDPtr) ext_rec;
 
1349
 
 
1350
                        if (status == XT_LOG_ENT_REC_REMOVED_BI)
 
1351
                                goto do_rec_freed;
 
1352
 
 
1353
                        if (!in_sequence)
 
1354
                                goto do_rec_freed;
 
1355
 
 
1356
                        XTTabRecFreeDRec        prev_free_rec;
 
1357
                        xtRecordID                      prev_rec_id;
 
1358
 
 
1359
                        prev_rec_id = XT_GET_DISK_4(record->bl.bl_prev_rec_id_4);
 
1360
                        XT_SET_DISK_4(prev_free_rec.rf_next_rec_id_4, rec_id);
 
1361
                        WRITE_REC_FILE(self, ot, prev_rec_id, offsetof(XTTabRecFreeDRec, rf_next_rec_id_4), 4, prev_free_rec.rf_next_rec_id_4, in_sequence, ws);
 
1362
                        tab->tab_bytes_to_flush += 4;
 
1363
                        tab->tab_flush_pending = TRUE;
 
1364
                        
 
1365
                        tab->tab_head_rec_fnum++;
 
1366
                        WRITE_REC_FILE(self, ot, rec_id, 0, sizeof(XTTabRecFreeDRec), (xtWord1 *) free_data, in_sequence, ws);
 
1367
                        tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
 
1368
                        tab->tab_flush_pending = TRUE;
 
1369
                        break;
 
1370
                }
 
1371
                case XT_DEFUNKT_REC_FREED:
 
1372
                        rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
 
1373
                        free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;
 
1374
                        do_rec_freed:
 
1375
                        if (!in_sequence) {
 
1376
                                size_t  red_size;
 
1377
 
 
1378
                                /* Free the record.
 
1379
                                 * We place the record on front of the current
 
1380
                                 * free list.
 
1381
                                 *
 
1382
                                 * However, before we do this, we remove the record
 
1383
                                 * from its row list, if the record is on a row list.
 
1384
                                 *
 
1385
                                 * We do this here, because in the normal removal
 
1386
                                 * from the row list uses the operations:
 
1387
                                 *
 
1388
                                 * XT_LOG_ENT_REC_UNLINKED, XT_LOG_ENT_ROW_SET and
 
1389
                                 * XT_LOG_ENT_ROW_FREED.
 
1390
                                 *
 
1391
                                 * When operations are performed out of sequence,
 
1392
                                 * these operations are ignored for the purpose
 
1393
                                 * of removing the record from the row.
 
1394
                                 */
 
1395
                                if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head, NULL, &self->st_statistics.st_rec, self))
 
1396
                                        xt_throw(self);
 
1397
                                /* The record is already free: */
 
1398
                                if (XT_REC_IS_FREE(rec_head.tr_rec_type_1))
 
1399
                                        goto free_done;
 
1400
                                row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
 
1401
 
 
1402
                                /* Search the row for this record: */
 
1403
                                if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, NULL, &self->st_statistics.st_rec, self))
 
1404
                                        xt_throw(self);
 
1405
                                link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
 
1406
                                prev_link_rec_id = 0;
 
1407
                                while (link_rec_id) {
 
1408
                                        if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &red_size, &self->st_statistics.st_rec, self)) {
 
1409
                                                xt_log_and_clear_exception(self);
 
1410
                                                break;
 
1411
                                        }
 
1412
                                        if (red_size < sizeof(XTTabRecHeadDRec))
 
1413
                                                break;
 
1414
                                        if (link_rec_id == rec_id)
 
1415
                                                break;
 
1416
                                        if (XT_GET_DISK_4(rec_head.tr_row_id_4) != row_id)
 
1417
                                                break;
 
1418
                                        switch (rec_head.tr_rec_type_1 & XT_TAB_STATUS_MASK) {
 
1419
                                                case XT_TAB_STATUS_FREED:
 
1420
                                                        break;
 
1421
                                                case XT_TAB_STATUS_DELETE:
 
1422
                                                case XT_TAB_STATUS_FIXED:
 
1423
                                                case XT_TAB_STATUS_VARIABLE:
 
1424
                                                case XT_TAB_STATUS_EXT_DLOG:
 
1425
                                                        break;
 
1426
                                                default:
 
1427
                                                        ASSERT(FALSE);
 
1428
                                                        goto exit_loop;
 
1429
                                        }
 
1430
                                        if (rec_head.tr_rec_type_1 & ~(XT_TAB_STATUS_CLEANED_BIT | XT_TAB_STATUS_MASK)) {
 
1431
                                                ASSERT(FALSE);
 
1432
                                                break;
 
1433
                                        }
 
1434
                                        prev_link_rec_id = link_rec_id;
 
1435
                                        link_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
 
1436
                                }
 
1437
 
 
1438
                                exit_loop:
 
1439
                                if (link_rec_id == rec_id) {
 
1440
                                        /* The record was found on the row list, remove it: */
 
1441
                                        if (prev_link_rec_id) {
 
1442
                                                /* We write the previous variation pointer from position 'link_rec_id' into
 
1443
                                                 * variation pointer of the 'prev_link_rec_id' record. This unlinks 'link_rec_id'!
 
1444
                                                 */
 
1445
                                                if (!xt_tab_write_rec(ot, xt_rec_id_to_rec_offset(tab, prev_link_rec_id) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4), XT_RECORD_ID_SIZE, (xtWord1 *) &rec_head.tr_prev_rec_id_4))
 
1446
                                                        xt_throw(self);
 
1447
                                                tab->tab_bytes_to_flush += XT_RECORD_ID_SIZE;
 
1448
                                        }
 
1449
                                        else {
 
1450
                                                /* The record is at the front of the row list: */
 
1451
                                                xtRefID ref_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
 
1452
                                                XT_SET_DISK_4(row_buf.rr_ref_id_4, ref_id);
 
1453
                                                if (!xt_tab_write_row(ot, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf))
 
1454
                                                        xt_throw(self);
 
1455
                                                tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
 
1456
                                        }
 
1457
                                }                               
 
1458
 
 
1459
                                /* Now we free the record, by placing it at the front of
 
1460
                                 * the free list:
 
1461
                                 */
 
1462
                                XT_SET_DISK_4(free_data->rf_next_rec_id_4, tab->tab_head_rec_free_id);                          
 
1463
                        }
 
1464
                        tab->tab_head_rec_free_id = rec_id;
 
1465
                        tab->tab_head_rec_fnum++;
 
1466
                        WRITE_REC_FILE(self, ot, rec_id, 0, sizeof(XTTabRecFreeDRec), (xtWord1 *) free_data, in_sequence, ws);
 
1467
                        tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
 
1468
                        tab->tab_flush_pending = TRUE;
 
1469
                        free_done:
 
1470
                        break;
 
1471
                case XT_LOG_ENT_REC_MOVED:
 
1472
                        len = 8;
 
1473
                        rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
 
1474
                        WRITE_REC_FILE(self, ot, rec_id, offsetof(XTTabRecExtDRec, re_log_id_2), len, (xtWord1 *) &record->xw.xw_rec_type_1, in_sequence, ws);
 
1475
                        tab->tab_bytes_to_flush += len;
 
1476
                        tab->tab_flush_pending = TRUE;
 
1477
                        break;
 
1478
                case XT_LOG_ENT_REC_CLEANED:
 
1479
                        len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
 
1480
                        goto get_rec_offset;
 
1481
                case XT_LOG_ENT_REC_CLEANED_1:
 
1482
                        len = 1;
 
1483
                        goto get_rec_offset;
 
1484
                case XT_LOG_ENT_REC_UNLINKED:
 
1485
                        if (!in_sequence) {
 
1486
                                /* Unlink the record.
 
1487
                                 * This is done when the record is freed.
 
1488
                                 */
 
1489
                                break;
 
1490
                        }
 
1491
                        len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
 
1492
                        get_rec_offset:
 
1493
                        rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
 
1494
                        WRITE_REC_FILE(self, ot, rec_id, 0, len, (xtWord1 *) &record->xw.xw_rec_type_1, in_sequence, ws);
 
1495
                        tab->tab_bytes_to_flush += len;
 
1496
                        tab->tab_flush_pending = TRUE;
 
1497
                        break;
 
1498
                case XT_LOG_ENT_ROW_NEW:
 
1499
                        len = offsetof(XTactRowAddedEntryDRec, xa_free_list_4);
 
1500
                        row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
 
1501
                        if (!in_sequence) {
 
1502
                                /* A row was allocated from the EOF. Because operations are missing.
 
1503
                                 * The blocks between the current EOF and the new EOF need to be
 
1504
                                 * place on the free list!
 
1505
                                 */                             
 
1506
                                while (tab->tab_head_row_eof_id < row_id) {
 
1507
                                        XT_SET_DISK_4(row_buf.rr_ref_id_4, tab->tab_head_row_free_id);
 
1508
                                        if (!xt_tab_write_row(ot, xt_row_id_to_row_offset(tab, tab->tab_head_row_eof_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf))
 
1509
                                                xt_throw(self);
 
1510
                                        tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
 
1511
                                        tab->tab_head_row_free_id = tab->tab_head_row_eof_id;
 
1512
                                        tab->tab_head_row_eof_id++;
 
1513
                                }
 
1514
                        }
 
1515
                        if (tab->tab_head_row_eof_id < row_id + 1)
 
1516
                                tab->tab_head_row_eof_id = row_id + 1;
 
1517
                        tab->tab_flush_pending = TRUE;
 
1518
                        break;
 
1519
                case XT_LOG_ENT_ROW_NEW_FL:
 
1520
                        len = sizeof(XTactRowAddedEntryDRec);
 
1521
                        row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
 
1522
                        free_ref_id = XT_GET_DISK_4(record->xa.xa_free_list_4);
 
1523
                        if (!in_sequence) {
 
1524
                                size_t red_size;
 
1525
                                /* The record was taken from the free list.
 
1526
                                 * If the operations were in sequence, then this would be
 
1527
                                 * the front of the free list now.
 
1528
                                 * However, because operations are missing, it may no
 
1529
                                 * longer be the front of the free list!
 
1530
                                 * Search and remove:
 
1531
                                 */
 
1532
                                link_rec_id = tab->tab_head_row_free_id;
 
1533
                                prev_link_rec_id = 0;
 
1534
                                while (link_rec_id) {
 
1535
                                        if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, link_rec_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &red_size, &self->st_statistics.st_rec, self)) {
 
1536
                                                xt_log_and_clear_exception(self);
 
1537
                                                break;
 
1538
                                        }
 
1539
                                        if (red_size < sizeof(XTTabRowRefDRec))
 
1540
                                                break;
 
1541
                                        if (link_rec_id == row_id)
 
1542
                                                break;
 
1543
                                        prev_link_rec_id = link_rec_id;
 
1544
                                        link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
 
1545
                                }
 
1546
                                if (link_rec_id == row_id) {
 
1547
                                        /* The block was found on the free list, remove it: */
 
1548
                                        if (prev_link_rec_id) {
 
1549
                                                /* We write the record from position 'link_rec_id' into
 
1550
                                                 * position 'prev_link_rec_id'. This unlinks 'link_rec_id'!
 
1551
                                                 */
 
1552
                                                if (!xt_tab_write_row(ot, xt_row_id_to_row_offset(tab, prev_link_rec_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf))
 
1553
                                                        xt_throw(self);
 
1554
                                                tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
 
1555
                                                free_ref_id = tab->tab_head_row_free_id;
 
1556
                                        }
 
1557
                                        else
 
1558
                                                /* The block is at the front of the free list: */
 
1559
                                                free_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
 
1560
                                }
 
1561
                                else {
 
1562
                                        /* Not found? */
 
1563
                                        if (tab->tab_head_row_eof_id < row_id + 1)
 
1564
                                                tab->tab_head_row_eof_id = row_id + 1;
 
1565
                                        break;
 
1566
                                }
 
1567
                                        
 
1568
                        }
 
1569
                        if (tab->tab_head_row_eof_id < row_id + 1)
 
1570
                                tab->tab_head_row_eof_id = row_id + 1;
 
1571
                        tab->tab_head_row_free_id = free_ref_id;
 
1572
                        tab->tab_head_row_fnum--;
 
1573
                        tab->tab_flush_pending = TRUE;
 
1574
                        break;
 
1575
                case XT_LOG_ENT_ROW_FREED:
 
1576
                        row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
 
1577
                        if (!in_sequence) {
 
1578
                                /* Free the row.
 
1579
                                 * Since this operation is being performed out of sequence, we
 
1580
                                 * must assume that some other free and allocation operations
 
1581
                                 * must be missing.
 
1582
                                 * For this reason, we add the row to the front of the
 
1583
                                 * existing free list.
 
1584
                                 */
 
1585
                                XT_SET_DISK_4(record->wr.wr_ref_id_4, tab->tab_head_row_free_id);
 
1586
                        }
 
1587
                        tab->tab_head_row_free_id = row_id;
 
1588
                        tab->tab_head_row_fnum++;
 
1589
                        goto write_row_data;
 
1590
                case XT_LOG_ENT_ROW_ADD_REC:
 
1591
                        row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
 
1592
                        if (!in_sequence) {
 
1593
                                if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &tfer, &self->st_statistics.st_rec, self))
 
1594
                                        xt_throw(self);
 
1595
                                if (tfer == sizeof(XTTabRowRefDRec)) {
 
1596
                                        /* Add a record to the front of the row.
 
1597
                                         * This is easy, but we have to make sure that the next
 
1598
                                         * pointer in the record is correct.
 
1599
                                         */
 
1600
                                        rec_id = XT_GET_DISK_4(record->wr.wr_ref_id_4);
 
1601
                                        if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &tfer, &self->st_statistics.st_rec, self))
 
1602
                                                xt_throw(self);
 
1603
                                        if (tfer == sizeof(XTTabRecHeadDRec) && XT_GET_DISK_4(rec_head.tr_row_id_4) == row_id) {
 
1604
                                                /* This is now the correct next pointer: */
 
1605
                                                xtRecordID next_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
 
1606
                                                if (XT_GET_DISK_4(rec_head.tr_prev_rec_id_4) != next_ref_id &&
 
1607
                                                        rec_id != next_ref_id) {
 
1608
                                                        XT_SET_DISK_4(rec_head.tr_prev_rec_id_4, next_ref_id);
 
1609
                                                        if (!xt_tab_write_rec(ot, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
 
1610
                                                                xt_throw(self);
 
1611
                                                        tab->tab_bytes_to_flush += sizeof(XTTabRecHeadDRec);
 
1612
                                                }
 
1613
                                        }
 
1614
                                }
 
1615
                        }
 
1616
                        goto write_row_data;
 
1617
                case XT_LOG_ENT_ROW_SET:
 
1618
                        if (!in_sequence)
 
1619
                                /* This operation is ignored when out of sequence!
 
1620
                                 * The operation is used to remove a record from a row.
 
1621
                                 * This is done automatically when the record is freed.
 
1622
                                 */
 
1623
                                break;
 
1624
                        row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
 
1625
                        write_row_data:
 
1626
                        ASSERT_NS(XT_GET_DISK_4(record->wr.wr_ref_id_4) < tab->tab_head_rec_eof_id);
 
1627
                        if (!xt_tab_write_row(ot, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &record->wr.wr_ref_id_4))
 
1628
                                xt_throw(self);
 
1629
                        tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
 
1630
                        if (tab->tab_head_row_eof_id < row_id + 1)
 
1631
                                tab->tab_head_row_eof_id = row_id + 1;
 
1632
                        tab->tab_flush_pending = TRUE;
 
1633
                        break;
 
1634
                case XT_LOG_ENT_NO_OP:
 
1635
                case XT_LOG_ENT_END_OF_LOG:
 
1636
                        break;
 
1637
        }
 
1638
 
 
1639
        /* Maybe flush? */
 
1640
#ifdef XT_REC_FLUSH_THRESHOLD
 
1641
        if (self->st_statistics.st_rec.ts_write - tab->tab_rec_wr_last_flush > XT_REC_FLUSH_THRESHOLD) {
 
1642
                tab->tab_rec_wr_last_flush = self->st_statistics.st_rec.ts_write;
 
1643
                
 
1644
                if (!xt_async_flush_record_row(tab, FALSE, self))
 
1645
                        xt_throw(self);
 
1646
        }
 
1647
#endif
 
1648
 
 
1649
#ifdef XT_SORT_REC_WRITES
 
1650
        if (in_sequence && xt_sl_get_size(tab->tab_rec_dw_writes))
 
1651
                tab->tab_rec_dw_op_seq = op_seq;
 
1652
        else
 
1653
                tab->tab_head_op_seq = op_seq;
 
1654
#else
 
1655
        tab->tab_head_op_seq = op_seq;
 
1656
#endif
 
1657
        tab->tab_wr_op_seq = op_seq;
 
1658
}
 
1659
 
 
1660
/*
 
1661
 * Apply all operations that have been buffered
 
1662
 * for a particular table.
 
1663
 * Operations are buffered if they are
 
1664
 * read from the log out of sequence.
 
1665
 *
 
1666
 * In this case we buffer, and wait for the
 
1667
 * out of sequence operations to arrive.
 
1668
 *
 
1669
 * When the server is running, this will always be
 
1670
 * the case. A delay occurs while a transaction 
 
1671
 * fills its private log buffer.
 
1672
 */
 
1673
static void xres_apply_operations(XTThreadPtr self, XTWriterStatePtr ws, xtBool in_sequence)
 
1674
{
 
1675
        XTTableHPtr             tab = ws->ws_ot->ot_table;
 
1676
        u_int                   i = 0;
 
1677
        XTOperationPtr  op;
 
1678
        xtBool                  check_index;
 
1679
 
 
1680
// XTDatabaseHPtr db, XTOpenTablePtr ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf
 
1681
        xt_sl_lock(self, tab->tab_op_list);
 
1682
        for (;;) {
 
1683
                op = (XTOperationPtr) xt_sl_item_at(tab->tab_op_list, i);
 
1684
                if (!op)
 
1685
                        break;
 
1686
                if (in_sequence && tab->tab_wr_op_seq+1 != op->or_op_seq)
 
1687
                        break;
 
1688
                xt_db_set_size(self, &ws->ws_databuf, (size_t) op->or_op_len);
 
1689
                if (!ws->ws_db->db_xlog.xlog_rnd_read(&ws->ws_seqread, op->or_log_id, op->or_log_offset, (size_t) op->or_op_len, ws->ws_databuf.db_data, NULL, self))
 
1690
                        xt_throw(self);
 
1691
                check_index = ws->ws_in_recover && xt_comp_log_pos(op->or_log_id, op->or_log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;
 
1692
                xres_apply_change(self, ws, (XTXactLogBufferDPtr) ws->ws_databuf.db_data, in_sequence, check_index, op->or_op_seq);
 
1693
                if (tab->tab_wr_wake_freeer) {
 
1694
                        if (!XTTableSeq::xt_op_is_before(tab->tab_wr_op_seq, tab->tab_wake_freeer_op))
 
1695
                                xt_wr_wake_freeer(self, ws->ws_db);
 
1696
                }
 
1697
                i++;
 
1698
        }
 
1699
        xt_sl_remove_from_front(self, tab->tab_op_list, i);
 
1700
        xt_sl_unlock(self, tab->tab_op_list);
 
1701
}
 
1702
 
 
1703
/* Check for operations still remaining on tables.
 
1704
 * These operations are applied even though operations
 
1705
 * in sequence are missing.
 
1706
 */
 
1707
static xtBool xres_sync_operations(XTThreadPtr self, XTDatabaseHPtr db, XTWriterStatePtr ws)
 
1708
{
 
1709
        u_int                   edx;
 
1710
        XTTableEntryPtr te_ptr;
 
1711
        XTTableHPtr             tab;
 
1712
        xtBool                  op_synced = FALSE;
 
1713
 
 
1714
        xt_enum_tables_init(&edx);
 
1715
        while ((te_ptr = xt_enum_tables_next(self, db, &edx))) {
 
1716
                /* Dirty read of tab_op_list OK, here because this is the
 
1717
                 * only thread that updates the list!
 
1718
                 */
 
1719
                if ((tab = te_ptr->te_table)) {
 
1720
                        if (xt_sl_get_size(tab->tab_op_list)) {
 
1721
                                op_synced = TRUE;
 
1722
                                if (xres_open_table(self, ws, te_ptr->te_tab_id))
 
1723
                                        xres_apply_operations(self, ws, FALSE);
 
1724
                        }
 
1725
 
 
1726
                        /* Update the pointer cache: */
 
1727
                        tab->tab_seq.xt_op_seq_set(self, tab->tab_wr_op_seq+1);
 
1728
                        tab->tab_row_eof_id = tab->tab_head_row_eof_id;
 
1729
                        tab->tab_row_free_id = tab->tab_head_row_free_id;
 
1730
                        tab->tab_row_fnum = tab->tab_head_row_fnum;
 
1731
                        tab->tab_rec_eof_id = tab->tab_head_rec_eof_id;
 
1732
                        tab->tab_rec_free_id = tab->tab_head_rec_free_id;
 
1733
                        tab->tab_rec_fnum = tab->tab_head_rec_fnum;
 
1734
                }
 
1735
        }
 
1736
        return op_synced;
 
1737
}
 
1738
 
 
1739
#ifdef XT_CORRECT_TABLE_FREE_COUNT
 
1740
#define CORRECT_COUNT           TRUE
 
1741
#else
 
1742
#define CORRECT_COUNT           FALSE
 
1743
#endif
 
1744
#ifdef XT_CHECK_RECORD_FREE_COUNT
 
1745
#define CHECK_RECS                      TRUE
 
1746
#else
 
1747
#define CHECK_RECS                      FALSE
 
1748
#endif
 
1749
#if defined(XT_CHECK_RECORD_FREE_COUNT) || defined(XT_CHECK_ROW_FREE_COUNT)
 
1750
#define RECOVER_FREE_COUNTS
 
1751
#endif
 
1752
 
 
1753
#ifdef RECOVER_FREE_COUNTS
 
1754
/* {CORRECTED-ROW-COUNT}
 
1755
 * This error can be repeated by crashing the server during
 
1756
 * high activitity, after flush table writes the table header
 
1757
 * 
 
1758
 * On recovery, the free count "from the future" is used as
 
1759
 * the starting point for subsequent allocation and frees.
 
1760
 * The count is wrong after that point.
 
1761
 *
 
1762
 * The recovery of the count only works correctly if a
 
1763
 * checkpoint is complete successfully after that table
 
1764
 * header is flushed. Basically the writing of the table
 
1765
 * header should be synchronsized with the writing of the
 
1766
 * end of the checkpoint.
 
1767
 *
 
1768
 * Another solution would be to log the count, along with
 
1769
 * the allocate and free commannds.
 
1770
 *
 
1771
 * The 3rd solution is the one used here. The count is corrected
 
1772
 * after recovery.
 
1773
 */
 
1774
static void xres_recover_table_free_counts(XTThreadPtr self, XTDatabaseHPtr db, XTWriterStatePtr ws)
 
1775
{
 
1776
        u_int                   edx;
 
1777
        XTTableEntryPtr te_ptr;
 
1778
        XTTableHPtr             tab;
 
1779
 
 
1780
        xt_enum_tables_init(&edx);
 
1781
        while ((te_ptr = xt_enum_tables_next(self, db, &edx))) {
 
1782
                if ((tab = te_ptr->te_table)) {
 
1783
                        if (xres_open_table(self, ws, te_ptr->te_tab_id))
 
1784
                                xt_tab_check_free_lists(self, ws->ws_ot, CHECK_RECS, CORRECT_COUNT);
 
1785
                }
 
1786
        }
 
1787
}
 
1788
#endif
 
1789
 
 
1790
/*
 
1791
 * Operations from the log are applied in sequence order.
 
1792
 * If the operations are out of sequence, they are buffered
 
1793
 * until the missing operations appear.
 
1794
 *
 
1795
 * NOTE: No lock is required because there should only be
 
1796
 * one thread that does this!
 
1797
 */
 
1798
xtPublic void xt_xres_apply_in_order(XTThreadPtr self, XTWriterStatePtr ws, xtLogID log_id, xtLogOffset log_offset, XTXactLogBufferDPtr record)
 
1799
{
 
1800
        xtOpSeqNo               op_seq;
 
1801
        xtTableID               tab_id;
 
1802
        size_t                  len;
 
1803
        xtBool                  check_index;
 
1804
 
 
1805
// XTDatabaseHPtr db, XTOpenTablePtr *ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf
 
1806
        switch (record->xl.xl_status_1) {
 
1807
                case XT_LOG_ENT_REC_MODIFIED:
 
1808
                case XT_LOG_ENT_UPDATE:
 
1809
                case XT_LOG_ENT_INSERT:
 
1810
                case XT_LOG_ENT_DELETE:
 
1811
                case XT_LOG_ENT_UPDATE_BG:
 
1812
                case XT_LOG_ENT_INSERT_BG:
 
1813
                case XT_LOG_ENT_DELETE_BG:
 
1814
                        len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1) + (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
 
1815
                        op_seq = XT_GET_DISK_4(record->xu.xu_op_seq_4);
 
1816
                        tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
 
1817
                        break;
 
1818
                case XT_LOG_ENT_UPDATE_FL:
 
1819
                case XT_LOG_ENT_INSERT_FL:
 
1820
                case XT_LOG_ENT_DELETE_FL:
 
1821
                case XT_LOG_ENT_UPDATE_FL_BG:
 
1822
                case XT_LOG_ENT_INSERT_FL_BG:
 
1823
                case XT_LOG_ENT_DELETE_FL_BG:
 
1824
                        len = offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1) + (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
 
1825
                        op_seq = XT_GET_DISK_4(record->xf.xf_op_seq_4);
 
1826
                        tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
 
1827
                        break;
 
1828
                case XT_DEFUNKT_REC_FREED:
 
1829
                case XT_DEFUNKT_REC_REMOVED:
 
1830
                case XT_DEFUNKT_REC_REMOVED_EXT:
 
1831
                        /* [(7)] REMOVE is now a extended version of FREE! */
 
1832
                        len = offsetof(XTactFreeRecEntryDRec, fr_rec_type_1) + sizeof(XTTabRecFreeDRec);
 
1833
                        goto fixed_len_data;
 
1834
                case XT_LOG_ENT_REC_REMOVED_BI:
 
1835
                        len = offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1) + (size_t) XT_GET_DISK_2(record->rb.rb_size_2);
 
1836
                        op_seq = XT_GET_DISK_4(record->rb.rb_op_seq_4);
 
1837
                        tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
 
1838
                        break;
 
1839
                case XT_LOG_ENT_REC_REMOVED_BI_L:
 
1840
                        len = offsetof(XTactRemoveBILEntryDRec, bl_rec_type_1) + (size_t) XT_GET_DISK_2(record->bl.bl_size_2);
 
1841
                        op_seq = XT_GET_DISK_4(record->bl.bl_op_seq_4);
 
1842
                        tab_id = XT_GET_DISK_4(record->bl.bl_tab_id_4);
 
1843
                        break;
 
1844
                case XT_LOG_ENT_REC_MOVED:
 
1845
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 8;
 
1846
                        goto fixed_len_data;
 
1847
                case XT_LOG_ENT_REC_CLEANED:
 
1848
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
 
1849
                        goto fixed_len_data;
 
1850
                case XT_LOG_ENT_REC_CLEANED_1:
 
1851
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 1;
 
1852
                        goto fixed_len_data;
 
1853
                case XT_LOG_ENT_REC_UNLINKED:
 
1854
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
 
1855
                        fixed_len_data:
 
1856
                        op_seq = XT_GET_DISK_4(record->xw.xw_op_seq_4);
 
1857
                        tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
 
1858
                        break;
 
1859
                case XT_LOG_ENT_ROW_NEW:
 
1860
                        len = sizeof(XTactRowAddedEntryDRec) - 4;
 
1861
                        goto new_row;
 
1862
                case XT_LOG_ENT_ROW_NEW_FL:
 
1863
                        len = sizeof(XTactRowAddedEntryDRec);
 
1864
                        new_row:
 
1865
                        op_seq = XT_GET_DISK_4(record->xa.xa_op_seq_4);
 
1866
                        tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
 
1867
                        break;
 
1868
                case XT_LOG_ENT_ROW_ADD_REC:
 
1869
                case XT_LOG_ENT_ROW_SET:
 
1870
                case XT_LOG_ENT_ROW_FREED:
 
1871
                        len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4) + sizeof(XTTabRowRefDRec);
 
1872
                        op_seq = XT_GET_DISK_4(record->wr.wr_op_seq_4);
 
1873
                        tab_id = XT_GET_DISK_4(record->wr.wr_tab_id_4);
 
1874
                        break;
 
1875
                case XT_LOG_ENT_NO_OP:
 
1876
                case XT_LOG_ENT_END_OF_LOG:
 
1877
                        return;
 
1878
                default:
 
1879
                        return;
 
1880
        }
 
1881
 
 
1882
        if (!xres_open_table(self, ws, tab_id))
 
1883
                return;
 
1884
 
 
1885
        XTTableHPtr tab = ws->ws_ot->ot_table;
 
1886
 
 
1887
        /* NOTE:
 
1888
         *
 
1889
         * During normal operation this is actually given.
 
1890
         *
 
1891
         * During recovery, it only applies to the record/row files
 
1892
         * The index file is flushed indepently, and changes may
 
1893
         * have been applied to the index (due to a call to flush index,
 
1894
         * which comes as a result of out of memory) that have not been
 
1895
         * applied to the record/row files.
 
1896
         *
 
1897
         * As a result we need to do the index checks that apply to this
 
1898
         * change.
 
1899
         *
 
1900
         * At the moment, I will just do everything, which should not
 
1901
         * hurt!
 
1902
         *
 
1903
         * This error can be repeated by running the test
 
1904
         * runTest(OUT_OF_CACHE_UPDATE_TEST, 32, OUT_OF_CACHE_UPDATE_TEST_UPDATE_COUNT, OUT_OF_CACHE_UPDATE_TEST_SET_SIZE)
 
1905
         * and crashing after a while.
 
1906
         *
 
1907
         * Do this by setting not_this to NULL. This will cause the test to
 
1908
         * hang after a while. After a restart the indexes are corrupt if the
 
1909
         * ws->ws_in_recover condition is not present here. 
 
1910
         */
 
1911
        if (ws->ws_in_recover) {
 
1912
                if (!tab->tab_op_seq_set) {
 
1913
                        /* op_seq <= tab_wr_op_seq + 1: */
 
1914
                        ASSERT(XTTableSeq::xt_op_is_before(op_seq, tab->tab_wr_op_seq+2));
 
1915
                        if (XTTableSeq::xt_op_is_before(op_seq-1, tab->tab_wr_op_seq))
 
1916
                                /* Adjust the operation sequence number: */
 
1917
                                tab->tab_wr_op_seq = op_seq-1;
 
1918
                        tab->tab_head_op_seq = tab->tab_wr_op_seq;
 
1919
                        tab->tab_op_seq_set = TRUE;
 
1920
                }
 
1921
        }
 
1922
 
 
1923
        if (!XTTableSeq::xt_op_is_before(tab->tab_wr_op_seq, op_seq))
 
1924
                return;
 
1925
 
 
1926
        if (tab->tab_wr_op_seq+1 == op_seq) {
 
1927
                /* I could use tab_ind_rec_log_id, but this may be a problem, if
 
1928
                 * recovery does not recover up to the last committed transaction.
 
1929
                 */ 
 
1930
                check_index = ws->ws_in_recover && xt_comp_log_pos(log_id, log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;
 
1931
                xres_apply_change(self, ws, record, TRUE, check_index, op_seq);
 
1932
                if (tab->tab_wr_wake_freeer) {
 
1933
                        if (!XTTableSeq::xt_op_is_before(tab->tab_wr_op_seq, tab->tab_wake_freeer_op))
 
1934
                                xt_wr_wake_freeer(self, ws->ws_db);
 
1935
                }
 
1936
 
 
1937
                /* Apply any operations in the list that now follow on...
 
1938
                 * NOTE: the tab_op_list only has be locked for modification.
 
1939
                 * This is because only one thread ever changes the list
 
1940
                 * (on startup and the writer), but the checkpoint thread
 
1941
                 * reads it.
 
1942
                 */             
 
1943
                XTOperationPtr  op;
 
1944
                if ((op = (XTOperationPtr) xt_sl_first_item(tab->tab_op_list))) {
 
1945
                        if (tab->tab_wr_op_seq+1 == op->or_op_seq) {
 
1946
                                xres_apply_operations(self, ws, TRUE);
 
1947
                        }
 
1948
                }
 
1949
        }
 
1950
        else {
 
1951
                /* Add the operation to the list: */
 
1952
                XTOperationRec op;
 
1953
 
 
1954
                op.or_op_seq = op_seq;
 
1955
                op.or_op_len = len;
 
1956
                op.or_log_id = log_id;
 
1957
                op.or_log_offset = log_offset;
 
1958
                xt_sl_lock(self, tab->tab_op_list);
 
1959
                xt_sl_insert(self, tab->tab_op_list, &op_seq, &op);
 
1960
/*DBG*/
 
1961
                //ASSERT(tab->tab_op_list->sl_usage_count < 1000000);
 
1962
                xt_sl_unlock(self, tab->tab_op_list);
 
1963
        }
 
1964
}
 
1965
 
 
1966
/* ----------------------------------------------------------------------
 
1967
 * CHECKPOINTING FUNCTIONALITY
 
1968
 */
 
1969
 
 
1970
static xtBool xres_delete_data_log(XTDatabaseHPtr db, xtLogID log_id)
 
1971
{
 
1972
        XTDataLogFilePtr        data_log;
 
1973
        char                            path[PATH_MAX];
 
1974
 
 
1975
        db->db_datalogs.dlc_name(PATH_MAX, path, log_id);
 
1976
 
 
1977
        if (!db->db_datalogs.dlc_remove_data_log(log_id, TRUE))
 
1978
                return FAILED;
 
1979
 
 
1980
        if (xt_fs_exists(path)) {
 
1981
#ifdef DEBUG_LOG_DELETE
 
1982
                printf("-- delete log: %s\n", path);
 
1983
#endif
 
1984
                if (!xt_fs_delete(NULL, path))
 
1985
                        return FAILED;
 
1986
        }
 
1987
        /* The log was deleted: */
 
1988
        if (!db->db_datalogs.dlc_get_data_log(&data_log, log_id, TRUE, NULL))
 
1989
                return FAILED;
 
1990
        if (data_log) {
 
1991
                if (!db->db_datalogs.dls_set_log_state(data_log, XT_DL_DELETED))
 
1992
                        return FAILED;
 
1993
        }
 
1994
        return OK;
 
1995
}
 
1996
 
 
1997
static int xres_comp_flush_tabs(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
 
1998
{
 
1999
        xtTableID                               tab_id = *((xtTableID *) a);
 
2000
        XTCheckPointTablePtr    cp_tab = (XTCheckPointTablePtr) b;
 
2001
 
 
2002
        if (tab_id < cp_tab->cpt_tab_id)
 
2003
                return -1;
 
2004
        if (tab_id > cp_tab->cpt_tab_id)
 
2005
                return 1;
 
2006
        return 0;
 
2007
}
 
2008
 
 
2009
static void xres_init_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
 
2010
{
 
2011
        xt_init_mutex_with_autoname(self, &cp->cp_state_lock);
 
2012
        cp->cp_inited = TRUE;
 
2013
}
 
2014
 
 
2015
static void xres_free_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
 
2016
{
 
2017
        cp->cp_inited = FALSE;
 
2018
        xt_free_mutex(&cp->cp_state_lock);
 
2019
        if (cp->cp_table_ids) {
 
2020
                xt_free_sortedlist(self, cp->cp_table_ids);
 
2021
                cp->cp_table_ids = NULL;
 
2022
        }
 
2023
}
 
2024
 
 
2025
/*
 
2026
 * Remove the deleted logs so that they can be re-used.
 
2027
 * This is only possible after a checkpoint has been
 
2028
 * written that does _not_ include these logs as logs
 
2029
 * to be deleted!
 
2030
 */
 
2031
static xtBool xres_remove_data_logs(XTDatabaseHPtr db)
 
2032
{
 
2033
        u_int           no_of_logs = xt_sl_get_size(db->db_datalogs.dlc_deleted);
 
2034
        xtLogID         *log_id_ptr;
 
2035
 
 
2036
        for (u_int i=0; i<no_of_logs; i++) {
 
2037
                log_id_ptr = (xtLogID *) xt_sl_item_at(db->db_datalogs.dlc_deleted, i);
 
2038
                if (!db->db_datalogs.dlc_remove_data_log(*log_id_ptr, FALSE))
 
2039
                        return FAILED;
 
2040
        }
 
2041
        xt_sl_set_size(db->db_datalogs.dlc_deleted, 0);
 
2042
        return OK;
 
2043
}
 
2044
 
 
2045
/* ----------------------------------------------------------------------
 
2046
 * INIT & EXIT
 
2047
 */
 
2048
 
 
2049
xtPublic void xt_xres_init(XTThreadPtr self, XTDatabaseHPtr db)
 
2050
{
 
2051
        xtLogID max_log_id;
 
2052
 
 
2053
        xt_init_mutex_with_autoname(self, &db->db_cp_lock);
 
2054
        xt_init_cond(self, &db->db_cp_cond);
 
2055
        xt_init_mutex_with_autoname(self, &db->db_fl_lock);
 
2056
        
 
2057
        xres_init_checkpoint_state(self, &db->db_cp_state);
 
2058
        db->db_restart.xres_init(self, db, &db->db_wr_log_id, &db->db_wr_log_offset, &max_log_id);
 
2059
 
 
2060
        /* It is also the position where transactions will start writing the
 
2061
         * log:
 
2062
         */
 
2063
        if (!db->db_xlog.xlog_set_write_offset(db->db_wr_log_id, db->db_wr_log_offset, max_log_id, self))
 
2064
                xt_throw(self);
 
2065
}
 
2066
 
 
2067
xtPublic void xt_xres_exit(XTThreadPtr self, XTDatabaseHPtr db)
 
2068
{
 
2069
        db->db_restart.xres_exit(self);
 
2070
        xres_free_checkpoint_state(self, &db->db_cp_state);
 
2071
        xt_free_mutex(&db->db_cp_lock);
 
2072
        xt_free_cond(&db->db_cp_cond);
 
2073
        xt_free_mutex(&db->db_fl_lock);
 
2074
}
 
2075
 
 
2076
/* ----------------------------------------------------------------------
 
2077
 * RESTART FUNCTIONALITY
 
2078
 */
 
2079
 
 
2080
/*
 
2081
 * Restart the database. This function loads the restart position, and
 
2082
 * applies all changes in the logs, until the end of the log, or
 
2083
 * a corrupted record is found.
 
2084
 *
 
2085
 * The restart position is the position in the log where we know that
 
2086
 * all the changes up to that point have been flushed to the
 
2087
 * database.
 
2088
 *
 
2089
 * This is called the checkpoint position. The checkpoint position
 
2090
 * is written alternatively to 2 restart files.
 
2091
 *
 
2092
 * To make a checkpoint:
 
2093
 * Get the current log writer log offset.
 
2094
 * For each table:
 
2095
 *    Get the log offset of the next operation on the table, if an
 
2096
 *    operation is queued for the table.
 
2097
 *    Flush that table, and the operation sequence to the table.
 
2098
 * For each unclean transaction:
 
2099
 *    Get the log offset of the begin of the transaction.
 
2100
 * Write the lowest of all log offsets to the restart file!
 
2101
 */
 
2102
 
 
2103
void XTXactRestart::xres_init(XTThreadPtr self, XTDatabaseHPtr db, xtLogID *log_id, xtLogOffset *log_offset, xtLogID *max_log_id)
 
2104
{
 
2105
        char                                    path[PATH_MAX];
 
2106
        XTOpenFilePtr                   of = NULL;
 
2107
        XTXlogCheckpointDPtr    res_1_buffer = NULL;
 
2108
        XTXlogCheckpointDPtr    res_2_buffer = NULL;
 
2109
        XTXlogCheckpointDPtr    use_buffer;
 
2110
        xtLogID                                 ind_rec_log_id = 0;
 
2111
        xtLogOffset                             ind_rec_log_offset = 0;
 
2112
 
 
2113
        enter_();
 
2114
        xres_db = db;
 
2115
 
 
2116
        ASSERT(!self->st_database);
 
2117
        /* The following call stack:
 
2118
         * XTDatabaseLog::xlog_flush_pending()
 
2119
         * XTDatabaseLog::xlog_flush()
 
2120
         * xt_xlog_flush_log()
 
2121
         * xt_flush_indices()
 
2122
         * idx_out_of_memory_failure()
 
2123
         * xt_idx_delete()
 
2124
         * xres_remove_index_entries()
 
2125
         * xres_apply_change()
 
2126
         * xt_xres_apply_in_order()
 
2127
         * XTXactRestart::xres_restart()
 
2128
         * XTXactRestart::xres_init()
 
2129
         * Leads to st_database being used!
 
2130
         */
 
2131
        self->st_database = db;
 
2132
 
 
2133
#ifdef SKIP_STARTUP_CHECKPOINT
 
2134
        /* When debugging, we do not checkpoint immediately, just in case
 
2135
         * we detect a problem during recovery.
 
2136
         */
 
2137
        xres_cp_required = FALSE;
 
2138
#else
 
2139
        xres_cp_required = TRUE;
 
2140
#endif
 
2141
        xres_cp_number = 0;
 
2142
        try_(a) {
 
2143
 
 
2144
                /* Figure out which restart file to use.
 
2145
                 */
 
2146
                xres_name(PATH_MAX, path, 1);
 
2147
                if ((of = xt_open_file(self, path, XT_FT_STANDARD, XT_FS_MISSING_OK, 16*1024*1024))) {
 
2148
                        size_t res_1_size;
 
2149
 
 
2150
                        res_1_size = (size_t) xt_seek_eof_file(self, of);
 
2151
                        res_1_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_1_size);
 
2152
                        if (!xt_pread_file(of, 0, res_1_size, res_1_size, res_1_buffer, NULL, &self->st_statistics.st_x, self))
 
2153
                                xt_throw(self);
 
2154
                        xt_close_file(self, of);
 
2155
                        of = NULL;
 
2156
                        if (!xres_check_checksum(res_1_buffer, res_1_size)) {
 
2157
                                xt_free(self, res_1_buffer);
 
2158
                                res_1_buffer = NULL;
 
2159
                        }
 
2160
                }
 
2161
 
 
2162
                xres_name(PATH_MAX, path, 2);
 
2163
                if ((of = xt_open_file(self, path, XT_FT_STANDARD, XT_FS_MISSING_OK, 16*1024*1024))) {
 
2164
                        size_t res_2_size;
 
2165
 
 
2166
                        res_2_size = (size_t) xt_seek_eof_file(self, of);
 
2167
                        res_2_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_2_size);
 
2168
                        if (!xt_pread_file(of, 0, res_2_size, res_2_size, res_2_buffer, NULL, &self->st_statistics.st_x, self))
 
2169
                                xt_throw(self);
 
2170
                        xt_close_file(self, of);
 
2171
                        of = NULL;
 
2172
                        if (!xres_check_checksum(res_2_buffer, res_2_size)) {
 
2173
                                xt_free(self, res_2_buffer);
 
2174
                                res_2_buffer = NULL;
 
2175
                        }
 
2176
                }
 
2177
 
 
2178
                if (res_1_buffer && res_2_buffer) {
 
2179
                        if (xt_comp_log_pos(
 
2180
                                XT_GET_DISK_4(res_1_buffer->xcp_log_id_4),
 
2181
                                XT_GET_DISK_6(res_1_buffer->xcp_log_offs_6),
 
2182
                                XT_GET_DISK_4(res_2_buffer->xcp_log_id_4),
 
2183
                                XT_GET_DISK_6(res_2_buffer->xcp_log_offs_6)) > 0) {
 
2184
                                /* The first log is the further along than the second: */
 
2185
                                xt_free(self, res_2_buffer);
 
2186
                                res_2_buffer = NULL;
 
2187
                        }
 
2188
                        else {
 
2189
                                if (XT_GET_DISK_6(res_1_buffer->xcp_chkpnt_no_6) >
 
2190
                                        XT_GET_DISK_6(res_2_buffer->xcp_chkpnt_no_6)) {
 
2191
                                        xt_free(self, res_2_buffer);
 
2192
                                        res_2_buffer = NULL;
 
2193
                                }
 
2194
                                else {
 
2195
                                        xt_free(self, res_1_buffer);
 
2196
                                        res_1_buffer = NULL;
 
2197
                                }
 
2198
                        }
 
2199
                }
 
2200
 
 
2201
                if (res_1_buffer) {
 
2202
                        use_buffer = res_1_buffer;
 
2203
                        xres_next_res_no = 2;
 
2204
                }
 
2205
                else {
 
2206
                        use_buffer = res_2_buffer;
 
2207
                        xres_next_res_no = 1;
 
2208
                }
 
2209
 
 
2210
                /* Read the checkpoint data: */
 
2211
                if (use_buffer) {
 
2212
                        u_int           no_of_logs;
 
2213
                        xtLogID         xt_log_id;
 
2214
                        xtTableID       xt_tab_id;
 
2215
 
 
2216
                        xres_cp_number = XT_GET_DISK_6(use_buffer->xcp_chkpnt_no_6);
 
2217
                        xres_cp_log_id = XT_GET_DISK_4(use_buffer->xcp_log_id_4);
 
2218
                        xres_cp_log_offset = XT_GET_DISK_6(use_buffer->xcp_log_offs_6);
 
2219
                        xt_tab_id = XT_GET_DISK_4(use_buffer->xcp_tab_id_4);
 
2220
                        if (xt_tab_id > db->db_curr_tab_id)
 
2221
                                db->db_curr_tab_id = xt_tab_id;
 
2222
                        db->db_xn_curr_id = XT_GET_DISK_4(use_buffer->xcp_xact_id_4);
 
2223
                        ind_rec_log_id = XT_GET_DISK_4(use_buffer->xcp_ind_rec_log_id_4);
 
2224
                        ind_rec_log_offset = XT_GET_DISK_6(use_buffer->xcp_ind_rec_log_offs_6);
 
2225
                        no_of_logs = XT_GET_DISK_2(use_buffer->xcp_log_count_2);
 
2226
 
 
2227
#ifdef DEBUG_PRINT
 
2228
                        printf("CHECKPOINT log=%d offset=%d ", (int) xres_cp_log_id, (int) xres_cp_log_offset);
 
2229
                        if (no_of_logs)
 
2230
                                printf("DELETED LOGS: ");
 
2231
#endif
 
2232
 
 
2233
                        /* Logs that are deleted are locked until _after_ the next
 
2234
                         * checkpoint.
 
2235
                         *
 
2236
                         * To prevent the following problem from occuring:
 
2237
                         * - Recovery is performed, and log X is deleted 
 
2238
                         * - After delete a log is free for re-use.
 
2239
                         *   New data is writen to log X.
 
2240
                         * - Server crashes.
 
2241
                         * - Recovery is performed from previous checkpoint,
 
2242
                         *   and log X is deleted again.
 
2243
                         *
 
2244
                         * To lock the logs the are placed on the deleted list.
 
2245
                         * After the next checkpoint, all logs on this list
 
2246
                         * will be removed.
 
2247
                         */
 
2248
                        for (u_int i=0; i<no_of_logs; i++) {
 
2249
                                xt_log_id = (xtLogID) XT_GET_DISK_2(use_buffer->xcp_del_log[i]);
 
2250
#ifdef DEBUG_PRINT
 
2251
                                if (i != 0)
 
2252
                                        printf(", ");
 
2253
                                printf("%d", (int) xt_log_id);
 
2254
#endif
 
2255
#ifdef DEBUG_KEEP_LOGS
 
2256
                                xt_dl_set_to_delete(self, db, xt_log_id);
 
2257
#else
 
2258
                                if (!xres_delete_data_log(db, xt_log_id))
 
2259
                                        xt_throw(self);
 
2260
#endif
 
2261
                        }
 
2262
 
 
2263
#ifdef DEBUG_PRINT
 
2264
                        printf("\n");
 
2265
#endif
 
2266
                }
 
2267
                else {
 
2268
                        /* Try to determine the correct start point. */
 
2269
                        xres_cp_number = 0;
 
2270
                        xres_cp_log_id = xt_xlog_get_min_log(self, db);
 
2271
                        xres_cp_log_offset = 0;
 
2272
                        ind_rec_log_id = xres_cp_log_id;
 
2273
                        ind_rec_log_offset = xres_cp_log_offset;
 
2274
 
 
2275
#ifdef DEBUG_PRINT
 
2276
                        printf("CHECKPOINT log=1 offset=0\n");
 
2277
#endif
 
2278
                }
 
2279
 
 
2280
                if (res_1_buffer) {
 
2281
                        xt_free(self, res_1_buffer);
 
2282
                        res_1_buffer = NULL;
 
2283
                }
 
2284
                if (res_2_buffer) {
 
2285
                        xt_free(self, res_2_buffer);
 
2286
                        res_2_buffer = NULL;
 
2287
                }
 
2288
 
 
2289
                if (!xres_restart(self, log_id, log_offset, ind_rec_log_id, ind_rec_log_offset, max_log_id))
 
2290
                        xt_throw(self);
 
2291
        }
 
2292
        catch_(a) {
 
2293
                XTException e;
 
2294
 
 
2295
                xt_enter_exception_handler(self, &e);
 
2296
                self->st_database = NULL;
 
2297
                if (of)
 
2298
                        xt_close_file_ns(of);
 
2299
                if (res_1_buffer)
 
2300
                        xt_free_ns(res_1_buffer);
 
2301
                if (res_2_buffer)
 
2302
                        xt_free_ns(res_2_buffer);
 
2303
                xres_exit(self);
 
2304
                xt_exit_exception_handler(self, &e);
 
2305
                xt_throw(self);
 
2306
        }
 
2307
        cont_(a);
 
2308
        self->st_database = NULL;
 
2309
 
 
2310
        exit_();
 
2311
}
 
2312
 
 
2313
void XTXactRestart::xres_exit(XTThreadPtr XT_UNUSED(self))
 
2314
{
 
2315
}
 
2316
 
 
2317
void XTXactRestart::xres_name(size_t size, char *path, xtLogID log_id)
 
2318
{
 
2319
        char name[50];
 
2320
 
 
2321
        sprintf(name, "restart-%lu.xt", (u_long) log_id);
 
2322
        xt_strcpy(size, path, xres_db->db_main_path);
 
2323
        xt_add_system_dir(size, path);
 
2324
        xt_add_dir_char(size, path);
 
2325
        xt_strcat(size, path, name);
 
2326
}
 
2327
 
 
2328
xtBool XTXactRestart::xres_check_checksum(XTXlogCheckpointDPtr buffer, size_t size)
 
2329
{
 
2330
        size_t          head_size;
 
2331
 
 
2332
        /* The minimum size: */
 
2333
        if (size < offsetof(XTXlogCheckpointDRec, xcp_head_size_4) + 4)
 
2334
                return FAILED;
 
2335
 
 
2336
        /* Check the sizes: */
 
2337
        head_size = XT_GET_DISK_4(buffer->xcp_head_size_4);
 
2338
        if (size < head_size)
 
2339
                return FAILED;
 
2340
 
 
2341
        if (XT_GET_DISK_2(buffer->xcp_checksum_2) != xt_get_checksum(((xtWord1 *) buffer) + 2, size - 2, 1))
 
2342
                return FAILED;
 
2343
 
 
2344
        if (XT_GET_DISK_2(buffer->xcp_version_2) != XT_CHECKPOINT_VERSION)
 
2345
                return FAILED;
 
2346
 
 
2347
        return OK;
 
2348
}
 
2349
 
 
2350
void XTXactRestart::xres_recover_progress(XTThreadPtr self, XTOpenFilePtr *of, int perc)
 
2351
{
 
2352
#ifdef XT_USE_GLOBAL_DB
 
2353
        if (perc > 100) {
 
2354
                char file_path[PATH_MAX];
 
2355
 
 
2356
                if (*of) {
 
2357
                        xt_close_file(self, *of);
 
2358
                        *of = NULL;
 
2359
                }
 
2360
                xt_strcpy(PATH_MAX, file_path, xres_db->db_main_path);
 
2361
                xt_add_pbxt_file(PATH_MAX, file_path, "recovery-progress");
 
2362
                if (xt_fs_exists(file_path))
 
2363
                        xt_fs_delete(self, file_path);
 
2364
        }
 
2365
        else {
 
2366
                char number[40];
 
2367
 
 
2368
                if (!*of) {
 
2369
                        char file_path[PATH_MAX];
 
2370
 
 
2371
                        xt_strcpy(PATH_MAX, file_path, xres_db->db_main_path);
 
2372
                        xt_add_pbxt_file(PATH_MAX, file_path, "recovery-progress");
 
2373
                        *of = xt_open_file(self, file_path, XT_FT_STANDARD, XT_FS_CREATE | XT_FS_MAKE_PATH, 16*1024*1024);
 
2374
                        xt_set_eof_file(self, *of, 0);
 
2375
                }
 
2376
 
 
2377
                sprintf(number, "%d", perc);
 
2378
                if (!xt_pwrite_file(*of, 0, strlen(number), number, &self->st_statistics.st_x, self))
 
2379
                        xt_throw(self);
 
2380
                if (!xt_flush_file(*of, &self->st_statistics.st_x, self))
 
2381
                        xt_throw(self);
 
2382
        }
 
2383
#endif
 
2384
}
 
2385
 
 
2386
xtBool XTXactRestart::xres_restart(XTThreadPtr self, xtLogID *log_id, xtLogOffset *log_offset, xtLogID ind_rec_log_id, xtLogOffset ind_rec_log_offset, xtLogID *max_log_id)
 
2387
{
 
2388
        xtBool                                  ok = TRUE;
 
2389
        XTDatabaseHPtr                  db = xres_db;
 
2390
        XTXactLogBufferDPtr             record;
 
2391
        xtXactID                                xn_id;
 
2392
        XTXactDataPtr                   xact;
 
2393
        xtTableID                               tab_id;
 
2394
        XTWriterStateRec                ws;
 
2395
        off_t                                   bytes_read = 0;
 
2396
        off_t                                   bytes_to_read;
 
2397
        volatile xtBool                 print_progress = FALSE;
 
2398
        volatile off_t                  perc_size = 0, next_goal = 0;
 
2399
        int                                             perc_complete = 1, perc_to_write = 1;
 
2400
        XTOpenFilePtr                   progress_file = NULL;
 
2401
        xtBool                                  min_ram_xn_id_set = FALSE;
 
2402
        u_int                                   log_count;
 
2403
        time_t                                  start_time;
 
2404
 
 
2405
        memset(&ws, 0, sizeof(ws));
 
2406
 
 
2407
        ws.ws_db = db;
 
2408
        ws.ws_in_recover = TRUE;
 
2409
        ws.ws_ind_rec_log_id = ind_rec_log_id;
 
2410
        ws.ws_ind_rec_log_offset = ind_rec_log_offset;
 
2411
 
 
2412
        /* Initialize the data log buffer (required if extended data is
 
2413
         * referenced).
 
2414
         * Note: this buffer is freed later. It is part of the thread
 
2415
         * "open database" state, and this means that a thread
 
2416
         * may not have another database open (in use) when
 
2417
         * it calls this functions.
 
2418
         */
 
2419
        self->st_dlog_buf.dlb_init(db, xt_db_log_buffer_size);
 
2420
 
 
2421
        if (!db->db_xlog.xlog_seq_init(&ws.ws_seqread, xt_db_log_buffer_size, TRUE))
 
2422
                return FAILED;
 
2423
 
 
2424
        bytes_to_read = xres_bytes_to_read(self, db, &log_count, max_log_id);
 
2425
        /* Don't print anything about recovering an empty database: */
 
2426
        if (bytes_to_read != 0)
 
2427
                xt_logf(XT_NT_INFO, "PBXT: Recovering from %lu-%llu, bytes to read: %llu\n", (u_long) xres_cp_log_id, (u_llong) xres_cp_log_offset, (u_llong) bytes_to_read);
 
2428
 
 
2429
        print_progress = FALSE;
 
2430
        start_time = time(NULL);
 
2431
        perc_size = bytes_to_read / 100;
 
2432
        next_goal = perc_size;
 
2433
 
 
2434
        if (!db->db_xlog.xlog_seq_start(&ws.ws_seqread, xres_cp_log_id, xres_cp_log_offset, FALSE)) {
 
2435
                ok = FALSE;
 
2436
                goto failed;
 
2437
        }
 
2438
 
 
2439
        try_(a) {
 
2440
                for (;;) {
 
2441
                        if (!db->db_xlog.xlog_seq_next(&ws.ws_seqread, &record, TRUE, self)) {
 
2442
                                ok = FALSE;
 
2443
                                break;
 
2444
                        }
 
2445
                        /* Increment before. If record is NULL then xseq_record_len will be zero,
 
2446
                         * UNLESS the last record was of type XT_LOG_ENT_END_OF_LOG 
 
2447
                         * which fills the log to align to block of size 512.
 
2448
                         */
 
2449
                        bytes_read += ws.ws_seqread.xseq_record_len;
 
2450
                        if (!record)
 
2451
                                break;
 
2452
#ifdef PRINT_LOG_ON_RECOVERY
 
2453
                        xt_print_log_record(ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
 
2454
#endif
 
2455
                        if (next_goal && bytes_read >= next_goal) {
 
2456
                                while (bytes_read >= next_goal) {
 
2457
                                        next_goal += perc_size;
 
2458
                                        perc_complete++;
 
2459
                                }
 
2460
                                if (!print_progress) {
 
2461
                                        if (time(NULL) - start_time > 2)
 
2462
                                                print_progress = TRUE;
 
2463
                                }
 
2464
                                if (print_progress) {
 
2465
                                        while (perc_to_write < perc_complete) {
 
2466
                                                if (((perc_to_write - 1) % 25) == 0)
 
2467
                                                        xt_logf(XT_NT_INFO, "PBXT: ");
 
2468
                                                if ((perc_to_write % 25) == 0)
 
2469
                                                        xt_logf(XT_NT_INFO, "%2d\n", (int) perc_to_write);
 
2470
                                                else
 
2471
                                                        xt_logf(XT_NT_INFO, "%2d ", (int) perc_to_write);
 
2472
                                                xt_log_flush(self);
 
2473
                                                xres_recover_progress(self, &progress_file, perc_to_write);
 
2474
                                                perc_to_write++;
 
2475
                                        }
 
2476
                                }
 
2477
                        }
 
2478
                        switch (record->xl.xl_status_1) {
 
2479
                                case XT_LOG_ENT_HEADER:
 
2480
                                        break;
 
2481
                                case XT_LOG_ENT_NEW_LOG: {
 
2482
                                        /* Adjust the bytes read for the fact that logs are written
 
2483
                                         * on 512 byte boundaries.
 
2484
                                         */
 
2485
                                        off_t offs, eof = ws.ws_seqread.xseq_log_eof;
 
2486
 
 
2487
                                        offs = ws.ws_seqread.xseq_rec_log_offset + ws.ws_seqread.xseq_record_len;
 
2488
                                        if (eof > offs)
 
2489
                                                bytes_read += eof - offs;
 
2490
                                        if (!db->db_xlog.xlog_seq_start(&ws.ws_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, TRUE))
 
2491
                                                xt_throw(self);
 
2492
                                        break;
 
2493
                                }
 
2494
                                case XT_LOG_ENT_NEW_TAB:
 
2495
                                        tab_id = XT_GET_DISK_4(record->xt.xt_tab_id_4);
 
2496
                                        if (tab_id > db->db_curr_tab_id)
 
2497
                                                db->db_curr_tab_id = tab_id;
 
2498
                                        break;
 
2499
                                case XT_LOG_ENT_UPDATE_BG:
 
2500
                                case XT_LOG_ENT_INSERT_BG:
 
2501
                                case XT_LOG_ENT_DELETE_BG:
 
2502
                                        xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
 
2503
                                        goto start_xact;
 
2504
                                case XT_LOG_ENT_UPDATE_FL_BG:
 
2505
                                case XT_LOG_ENT_INSERT_FL_BG:
 
2506
                                case XT_LOG_ENT_DELETE_FL_BG:
 
2507
                                        xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
 
2508
                                        start_xact:
 
2509
                                        if (xt_xn_is_before(db->db_xn_curr_id, xn_id))
 
2510
                                                db->db_xn_curr_id = xn_id;
 
2511
 
 
2512
                                        if (!(xact = xt_xn_add_old_xact(db, xn_id, self)))
 
2513
                                                xt_throw(self);
 
2514
 
 
2515
                                        xact->xd_begin_log = ws.ws_seqread.xseq_rec_log_id;
 
2516
                                        xact->xd_begin_offset = ws.ws_seqread.xseq_rec_log_offset;
 
2517
 
 
2518
                                        xact->xd_end_xn_id = xn_id;
 
2519
                                        xact->xd_end_time = db->db_xn_end_time;
 
2520
                                        xact->xd_flags = (XT_XN_XAC_LOGGED | XT_XN_XAC_ENDED | XT_XN_XAC_RECOVERED | XT_XN_XAC_SWEEP);
 
2521
 
 
2522
                                        /* This may affect the "minimum RAM transaction": */
 
2523
                                        if (!min_ram_xn_id_set || xt_xn_is_before(xn_id, db->db_xn_min_ram_id)) {
 
2524
                                                min_ram_xn_id_set = TRUE;
 
2525
                                                db->db_xn_min_ram_id = xn_id;
 
2526
                                        }
 
2527
                                        xt_xres_apply_in_order(self, &ws, ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
 
2528
                                        break;
 
2529
                                case XT_LOG_ENT_COMMIT:
 
2530
                                case XT_LOG_ENT_ABORT:
 
2531
                                        xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
 
2532
                                        if ((xact = xt_xn_get_xact(db, xn_id, self))) {
 
2533
                                                xact->xd_end_xn_id = xn_id;
 
2534
                                                xact->xd_flags |= XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
 
2535
                                                xact->xd_flags &= ~XT_XN_XAC_RECOVERED; // We can expect an end record on cleanup!
 
2536
                                                xact->xd_flags &= ~XT_XN_XAC_PREPARED;  // Prepared transactions cannot be swept!
 
2537
                                                if (record->xl.xl_status_1 == XT_LOG_ENT_COMMIT)
 
2538
                                                        xact->xd_flags |= XT_XN_XAC_COMMITTED;
 
2539
                                                if (xt_sl_get_size(db->db_xn_xa_list) > 0)
 
2540
                                                        xt_xn_delete_xa_data_by_xact(db, xn_id, self);
 
2541
                                        }
 
2542
                                        break;
 
2543
                                case XT_LOG_ENT_CLEANUP:
 
2544
                                        /* The transaction was cleaned up: */
 
2545
                                        xn_id = XT_GET_DISK_4(record->xc.xc_xact_id_4);
 
2546
                                        xt_xn_delete_xact(db, xn_id, self);
 
2547
                                        break;
 
2548
                                case XT_LOG_ENT_OP_SYNC:
 
2549
                                        xres_sync_operations(self, db, &ws);
 
2550
                                        break;
 
2551
                                case XT_LOG_ENT_DEL_LOG:
 
2552
                                        xtLogID rec_log_id;
 
2553
 
 
2554
                                        rec_log_id = XT_GET_DISK_4(record->xl.xl_log_id_4);
 
2555
                                        xt_dl_set_to_delete(self, db, rec_log_id);
 
2556
                                        break;
 
2557
                                case XT_LOG_ENT_PREPARE:
 
2558
                                        xn_id = XT_GET_DISK_4(record->xp.xp_xact_id_4);
 
2559
                                        if ((xact = xt_xn_get_xact(db, xn_id, self))) {
 
2560
                                                xact->xd_flags |= XT_XN_XAC_PREPARED;
 
2561
                                                if (!xt_xn_store_xa_data(db, xn_id, record->xp.xp_xa_len_1, record->xp.xp_xa_data, self))
 
2562
                                                        xt_throw(self);
 
2563
                                        }
 
2564
                                        break;
 
2565
                                default:
 
2566
                                        xt_xres_apply_in_order(self, &ws, ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
 
2567
                                        break;
 
2568
                        }
 
2569
                }
 
2570
 
 
2571
                if (xres_sync_operations(self, db, &ws)) {
 
2572
                        XTactOpSyncEntryDRec    op_sync;
 
2573
                        time_t                                  now = time(NULL);
 
2574
 
 
2575
                        op_sync.os_status_1 = XT_LOG_ENT_OP_SYNC;
 
2576
                        op_sync.os_checksum_1 = XT_CHECKSUM_1(now) ^ XT_CHECKSUM_1(ws.ws_seqread.xseq_rec_log_id);
 
2577
                        XT_SET_DISK_4(op_sync.os_time_4, (xtWord4) now);
 
2578
                        /* TODO: If this is done, check to see that
 
2579
                         * the byte written here are read back by the writter.
 
2580
                         * This is in order to be in sync with 'xl_log_bytes_written'.
 
2581
                         * i.e. xl_log_bytes_written == xl_log_bytes_read
 
2582
                         */
 
2583
                        if (!db->db_xlog.xlog_write_thru(&ws.ws_seqread, sizeof(XTactOpSyncEntryDRec), (xtWord1 *) &op_sync, self))
 
2584
                                xt_throw(self);
 
2585
                }
 
2586
 
 
2587
#ifdef XT_SORT_REC_WRITES
 
2588
                /* Flush all tables where we still have cached writes: */
 
2589
                xt_xres_flush_all(self, &ws);
 
2590
#endif
 
2591
        }
 
2592
        catch_(a) {
 
2593
                ok = FALSE;
 
2594
        }
 
2595
        cont_(a);
 
2596
 
 
2597
        if (ok) {
 
2598
                if (print_progress) {
 
2599
                        while (perc_complete <= 100) {
 
2600
                                if (((perc_complete - 1) % 25) == 0)
 
2601
                                        xt_logf(XT_NT_INFO, "PBXT: ");
 
2602
                                if ((perc_complete % 25) == 0)
 
2603
                                        xt_logf(XT_NT_INFO, "%2d\n", (int) perc_complete);
 
2604
                                else
 
2605
                                        xt_logf(XT_NT_INFO, "%2d ", (int) perc_complete);
 
2606
                                xt_log_flush(self);
 
2607
                                xres_recover_progress(self, &progress_file, perc_complete);
 
2608
                                perc_complete++;
 
2609
                        }
 
2610
                }
 
2611
                if (bytes_to_read != 0)
 
2612
                        xt_logf(XT_NT_INFO, "PBXT: Recovery complete at %lu-%llu, bytes read: %llu\n", (u_long) ws.ws_seqread.xseq_rec_log_id, (u_llong) ws.ws_seqread.xseq_rec_log_offset, (u_llong) bytes_read);
 
2613
 
 
2614
                *log_id = ws.ws_seqread.xseq_rec_log_id;
 
2615
                *log_offset = ws.ws_seqread.xseq_rec_log_offset;
 
2616
 
 
2617
                if (!min_ram_xn_id_set)
 
2618
                        /* This is true because if no transaction was placed in RAM then
 
2619
                         * the next transaction in RAM will have the next ID: */
 
2620
                        db->db_xn_min_ram_id = db->db_xn_curr_id + 1;
 
2621
 
 
2622
#ifdef RECOVER_FREE_COUNTS
 
2623
                if (xres_cp_log_id != *log_id || xres_cp_log_offset != *log_offset) {
 
2624
                        /* Recovery took place, correct the row count! */
 
2625
                        xres_recover_table_free_counts(self, db, &ws);
 
2626
                }
 
2627
#endif
 
2628
        }
 
2629
 
 
2630
        failed:
 
2631
        xt_free_writer_state(self, &ws);
 
2632
        self->st_dlog_buf.dlb_exit(self);
 
2633
        xres_recover_progress(self, &progress_file, 101);
 
2634
        return ok;
 
2635
}
 
2636
 
 
2637
xtBool XTXactRestart::xres_is_checkpoint_pending(xtLogID curr_log_id, xtLogOffset curr_log_offset)
 
2638
{
 
2639
        return xt_bytes_since_last_checkpoint(xres_db, curr_log_id, curr_log_offset) >= xt_db_checkpoint_frequency;
 
2640
}
 
2641
 
 
2642
/*
 
2643
 * Calculate the bytes to be read for recovery.
 
2644
 * This is only an estimate of the number of bytes that
 
2645
 * will be read.
 
2646
 */
 
2647
off_t XTXactRestart::xres_bytes_to_read(XTThreadPtr self, XTDatabaseHPtr db, u_int *log_count, xtLogID *max_log_id)
 
2648
{
 
2649
        off_t                           to_read = 0, eof;
 
2650
        xtLogID                         log_id = xres_cp_log_id;
 
2651
        char                            log_path[PATH_MAX];
 
2652
        XTOpenFilePtr           of;
 
2653
        XTXactLogHeaderDRec     log_head;
 
2654
        xtWord1                         end_head[4];
 
2655
        size_t                          head_size;
 
2656
        size_t                          red_size;
 
2657
 
 
2658
        *max_log_id = log_id;
 
2659
        *log_count = 0;
 
2660
        for (;;) {
 
2661
                db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
 
2662
                of = NULL;
 
2663
                if (!xt_open_file_ns(&of, log_path, XT_FT_STANDARD, XT_FS_MISSING_OK, 16*1024*1024))
 
2664
                        xt_throw(self);
 
2665
                if (!of)
 
2666
                        break;
 
2667
                pushr_(xt_close_file, of);
 
2668
 
 
2669
                /* Check the first record of the log, to see if it is valid. */
 
2670
                if (!xt_pread_file(of, 0, sizeof(XTXactLogHeaderDRec), 0, (xtWord1 *) &log_head, &red_size, &self->st_statistics.st_xlog, self))
 
2671
                        xt_throw(self);
 
2672
                /* The minimum size (old log size): */
 
2673
                if (red_size < XT_MIN_LOG_HEAD_SIZE)
 
2674
                        goto header_corrupt;
 
2675
                head_size = XT_GET_DISK_4(log_head.xh_size_4);
 
2676
                if (log_head.xh_status_1 != XT_LOG_ENT_HEADER)
 
2677
                        goto header_corrupt;
 
2678
                if (log_head.xh_checksum_1 != XT_CHECKSUM_1(log_id))
 
2679
                        goto header_corrupt;
 
2680
                if (!xt_pread_file(of, head_size - 4, 4, 0, end_head, &red_size, &self->st_statistics.st_xlog, self))
 
2681
                        xt_throw(self);
 
2682
                if (red_size != 4 || XT_GET_DISK_4(end_head) != XT_LOG_FILE_MAGIC)
 
2683
                        goto header_corrupt;
 
2684
                if (head_size > offsetof(XTXactLogHeaderDRec, xh_log_id_4) + 4) {
 
2685
                        if (XT_GET_DISK_4(log_head.xh_log_id_4) != log_id)
 
2686
                                goto header_corrupt;
 
2687
                }
 
2688
                if (head_size > offsetof(XTXactLogHeaderDRec, xh_version_2) + 2) {
 
2689
                        if (XT_GET_DISK_2(log_head.xh_version_2) > XT_LOG_VERSION_NO)                           
 
2690
                                xt_throw_ulxterr(XT_CONTEXT, XT_ERR_NEW_TYPE_OF_XLOG, (u_long) log_id);
 
2691
                }
 
2692
 
 
2693
                eof = xt_seek_eof_file(self, of);
 
2694
                freer_(); // xt_close_file(of)
 
2695
                if (log_id == xres_cp_log_id)
 
2696
                        to_read += (eof - xres_cp_log_offset);
 
2697
                else
 
2698
                        to_read += eof;
 
2699
                (*log_count)++;
 
2700
                *max_log_id = log_id;
 
2701
                log_id++;
 
2702
        }
 
2703
        return to_read;
 
2704
 
 
2705
        header_corrupt:
 
2706
        if (log_id == xres_cp_log_id && xres_cp_log_offset > (xtLogOffset) sizeof(XTXactLogHeaderDRec))
 
2707
                xt_throw_ulxterr(XT_CONTEXT, XT_ERR_LOG_HEADER_CORRUPT, (u_long) log_id);
 
2708
 
 
2709
        freer_(); // xt_close_file(of)
 
2710
        return to_read;
 
2711
 
 
2712
}
 
2713
 
 
2714
 
 
2715
/* ----------------------------------------------------------------------
 
2716
 * C H E C K P O I N T    P R O C E S S
 
2717
 */
 
2718
 
 
2719
#ifdef NEVER_CHECKPOINT
 
2720
xtBool no_checkpoint = TRUE;
 
2721
#endif
 
2722
 
 
2723
#define XT_CHECKPOINT_IF_NO_ACTIVITY            0
 
2724
#define XT_CHECKPOINT_PAUSE_IF_ACTIVITY         1
 
2725
#define XT_CHECKPOINT_NO_PAUSE                          2
 
2726
 
 
2727
#define XT_ASYNC_THREAD_FLUSH_LIMIT                     200
 
2728
 
 
2729
static void xres_cp_async_checkpoint(XTThreadPtr self, XTDatabaseHPtr db, u_int curr_writer_total, xtBool force_checkpoint)
 
2730
{
 
2731
        XTTableHPtr                     tab = NULL;
 
2732
        xtTableID                       tab_id;
 
2733
        int                                     flush_bit;
 
2734
        int                                     start_count = 0;
 
2735
 
 
2736
        /* Start a checkpoint (if one is not already running): */
 
2737
        if (!xt_begin_checkpoint(db, FALSE, self))
 
2738
                xt_throw(self);
 
2739
 
 
2740
        while (!self->t_quit) {
 
2741
                if (!xres_get_next_to_flush(db, &tab_id, &flush_bit, FALSE, TRUE))
 
2742
                        goto end_checkpoint;
 
2743
 
 
2744
                if (!tab_id) {
 
2745
                        if (!xt_wait_for_async_task_results(self))
 
2746
                                goto failed;
 
2747
                        start_count = 0;
 
2748
                        continue;
 
2749
                }
 
2750
 
 
2751
                if (tab) {
 
2752
                        if (tab->tab_id != tab_id) {
 
2753
                                xt_heap_release(self, tab);
 
2754
                                tab = NULL;
 
2755
                        }
 
2756
                }
 
2757
 
 
2758
                if (!tab) {
 
2759
                        if (!(tab = xt_use_table_by_id(self, db, tab_id, NULL))) {
 
2760
                                xt_throw_ulxterr(XT_CONTEXT, XT_ERR_TABLE_NOT_FOUND, (u_long) tab_id);
 
2761
                                goto failed;
 
2762
                        }
 
2763
                }
 
2764
 
 
2765
                if (flush_bit == XT_CPT_REC_ROW_FLUSHED) {
 
2766
                        if (!xt_async_flush_record_row(tab, TRUE, self))
 
2767
                                goto failed;
 
2768
                }
 
2769
                else {
 
2770
                        ASSERT(flush_bit == XT_CPT_INDEX_FLUSHED);
 
2771
                        if (!xt_async_flush_indices(tab, TRUE, FALSE, self))
 
2772
                                goto failed;
 
2773
                }
 
2774
                start_count++;
 
2775
 
 
2776
                if (!force_checkpoint) {
 
2777
                        if (curr_writer_total != db->db_xn_total_writer_count)
 
2778
                                goto end_checkpoint;
 
2779
                }
 
2780
 
 
2781
                if (start_count >= XT_ASYNC_THREAD_FLUSH_LIMIT) {
 
2782
                        if (!xt_wait_for_async_task_results(self))
 
2783
                                goto failed;
 
2784
                        start_count = 0;
 
2785
                }
 
2786
        }
 
2787
 
 
2788
        end_checkpoint:
 
2789
        if (tab)
 
2790
                xt_heap_release(self, tab);
 
2791
 
 
2792
        if (!xt_wait_for_async_task_results(self))
 
2793
                xt_throw(self);
 
2794
 
 
2795
        if (!xt_end_checkpoint(db, self, NULL))
 
2796
                xt_throw(self);
 
2797
 
 
2798
        return;
 
2799
        
 
2800
        failed:
 
2801
        if (tab)
 
2802
                xt_heap_release(self, tab);
 
2803
        xt_throw(self);
 
2804
}
 
2805
 
 
2806
/*
 
2807
 * This function performs table flush, as long as the system is idle.
 
2808
 */
 
2809
static void xres_cp_checkpoint(XTThreadPtr self, XTDatabaseHPtr db, u_int curr_writer_total, xtBool force_checkpoint)
 
2810
{
 
2811
        XTOpenTablePtr                  ot = NULL;
 
2812
        xtTableID                               tab_id;
 
2813
        int                                             flush_bit;
 
2814
        off_t                                   bytes_flushed = 0;
 
2815
        int                                             check_type;
 
2816
 
 
2817
#ifdef NEVER_CHECKPOINT
 
2818
        if (no_checkpoint)
 
2819
                return FALSE;
 
2820
#endif
 
2821
        if (force_checkpoint) {
 
2822
                if (db->db_restart.xres_cp_required)
 
2823
                        check_type = XT_CHECKPOINT_NO_PAUSE;
 
2824
                else
 
2825
                        check_type = XT_CHECKPOINT_PAUSE_IF_ACTIVITY;
 
2826
        }
 
2827
        else
 
2828
                check_type = XT_CHECKPOINT_IF_NO_ACTIVITY;      
 
2829
 
 
2830
        /* Start a checkpoint: */
 
2831
        if (!xt_begin_checkpoint(db, FALSE, self))
 
2832
                xt_throw(self);
 
2833
 
 
2834
        while (!self->t_quit) {
 
2835
                if (!xres_get_next_to_flush(db, &tab_id, &flush_bit, TRUE, FALSE))
 
2836
                        goto end_checkpoint;
 
2837
 
 
2838
                if (ot) {
 
2839
                        if (ot->ot_table->tab_id != tab_id) {
 
2840
                                xt_db_return_table_to_pool(self, ot);
 
2841
                                ot = NULL;
 
2842
                        }
 
2843
                }
 
2844
 
 
2845
                if (!ot)
 
2846
                        ot = xt_db_open_pool_table(self, db, tab_id, NULL, TRUE);
 
2847
 
 
2848
                if (ot) {
 
2849
                        if (flush_bit == XT_CPT_REC_ROW_FLUSHED) {
 
2850
                                if (!xt_flush_record_row(ot, &bytes_flushed, FALSE)) {
 
2851
                                        xt_db_return_table_to_pool(self, ot);
 
2852
                                        xt_throw(self);
 
2853
                                }
 
2854
                        }
 
2855
                        else {
 
2856
                                ASSERT(flush_bit == XT_CPT_INDEX_FLUSHED);
 
2857
                                if (!xt_flush_indices(ot, &bytes_flushed, FALSE, NULL)) {
 
2858
                                        xt_db_return_table_to_pool(self, ot);
 
2859
                                        xt_throw(self);
 
2860
                                }
 
2861
                        }
 
2862
                }
 
2863
                else {
 
2864
                        /* The table was not found. Question, should I just complete the checkpoint,
 
2865
                         * or should I cause an error?
 
2866
                         *
 
2867
                         * At the moment, I print the error and warning, and continue with the checkpoint.
 
2868
                         */
 
2869
                        if (self->t_exception.e_xt_err)
 
2870
                                xt_log_and_clear_exception(self);
 
2871
                        xt_logf(XT_NT_WARNING, "Checkpoint skipping table (ID) %lu: table was not found\n", (u_long) tab_id);
 
2872
                        xt_checkpoint_set_flush_state(db, tab_id, XT_CPT_STATE_DONE_ALL);
 
2873
                }
 
2874
 
 
2875
                switch (check_type) {
 
2876
                        case XT_CHECKPOINT_IF_NO_ACTIVITY:
 
2877
                                if (bytes_flushed > 0 && curr_writer_total != db->db_xn_total_writer_count)
 
2878
                                        goto end_checkpoint;
 
2879
                                break;
 
2880
                        case XT_CHECKPOINT_PAUSE_IF_ACTIVITY:
 
2881
                                if (bytes_flushed > 2 * 1024 * 1024 && curr_writer_total != db->db_xn_total_writer_count) {
 
2882
                                        curr_writer_total = db->db_xn_total_writer_count;
 
2883
                                        bytes_flushed = 0;
 
2884
                                        xt_sleep_milli_second(400);
 
2885
                                }
 
2886
                                break;
 
2887
                        case XT_CHECKPOINT_NO_PAUSE:
 
2888
                                break;
 
2889
                }
 
2890
        }
 
2891
 
 
2892
        end_checkpoint:
 
2893
        if (ot)
 
2894
                xt_db_return_table_to_pool(self, ot);
 
2895
 
 
2896
        if (!xt_end_checkpoint(db, self, NULL))
 
2897
                xt_throw(self);
 
2898
}
 
2899
 
 
2900
 
 
2901
/* Wait for the log writer to tell us to do something.
 
2902
 */
 
2903
static void xres_cp_wait_for_log_writer(XTThreadPtr self, XTDatabaseHPtr db, u_long milli_secs)
 
2904
{
 
2905
        xt_lock_mutex(self, &db->db_cp_lock);
 
2906
        pushr_(xt_unlock_mutex, &db->db_cp_lock);
 
2907
        if (!self->t_quit)
 
2908
                xt_timed_wait_cond(self, &db->db_cp_cond, &db->db_cp_lock, milli_secs);
 
2909
        freer_(); // xt_unlock_mutex(&db->db_cp_lock)
 
2910
}
 
2911
 
 
2912
static void xres_flush_indices(XTThreadPtr self, XTDatabaseHPtr db)
 
2913
{
 
2914
        u_int                   edx;
 
2915
        XTTableEntryPtr te_ptr;
 
2916
        XTTableHPtr             tab;
 
2917
 
 
2918
        if (!xt_db_index_dirty_threshold || xt_get_index_cache_dirty_perc() < xt_db_index_dirty_threshold)
 
2919
                return;
 
2920
 
 
2921
        /* Flush all indexes: */
 
2922
        xt_enum_tables_init(&edx);
 
2923
        while ((te_ptr = xt_enum_tables_next(NULL, db, &edx))) {
 
2924
                if ((tab = te_ptr->te_table)) {
 
2925
                        if (!xt_async_flush_indices(tab, FALSE, FALSE, self))
 
2926
                                xt_throw(self);
 
2927
                }
 
2928
        }
 
2929
}
 
2930
 
 
2931
/*
 
2932
 * This is the way checkpoint works:
 
2933
 *
 
2934
 * To write a checkpoint we need to flush all tables in
 
2935
 * the database.
 
2936
 *
 
2937
 * Before flushing the first table we get the checkpoint
 
2938
 * log position.
 
2939
 *
 
2940
 * After flushing all files we write of the checkpoint
 
2941
 * log position.
 
2942
 */
 
2943
static void xres_cp_main(XTThreadPtr self)
 
2944
{
 
2945
        XTDatabaseHPtr          db = self->st_database;
 
2946
        u_int                           curr_writer_total;
 
2947
        time_t                          now, last_index_flush;
 
2948
        xtBool                          async = TRUE;
 
2949
        xtXactID                        sweep_count;
 
2950
 
 
2951
        xt_set_low_priority(self);
 
2952
 
 
2953
        xt_db_approximate_time = time(NULL);
 
2954
        last_index_flush = xt_db_approximate_time;
 
2955
        while (!self->t_quit) {
 
2956
                /* Wait 2 seconds: */
 
2957
                curr_writer_total = db->db_xn_total_writer_count;
 
2958
                xt_db_approximate_time = time(NULL);
 
2959
                now = xt_db_approximate_time;
 
2960
                while (!self->t_quit && xt_db_approximate_time < now + 2 && !db->db_restart.xres_cp_required) {
 
2961
                        xres_cp_wait_for_log_writer(self, db, 400);
 
2962
                        xt_db_approximate_time = time(NULL);
 
2963
                        xt_db_free_unused_open_tables(self, db);
 
2964
                        xt_db_approximate_time = time(NULL);
 
2965
                        if (xt_db_approximate_time > last_index_flush + 2) {
 
2966
                                last_index_flush = xt_db_approximate_time;
 
2967
                                xres_flush_indices(self, db);
 
2968
                        }
 
2969
                }
 
2970
                
 
2971
                if (self->t_quit)
 
2972
                        break;
 
2973
 
 
2974
                /* If there is a long running transaction, then there is actitivity! 
 
2975
                 * And if the sweeper is not idle, then there is also activity!
 
2976
                 * And if the writer is active, then we should wait for it as well!
 
2977
                 */
 
2978
#ifdef XT_SWEEPER_SORT_XACTS
 
2979
                sweep_count = db->db_xn_curr_id + 1 - db->db_sw_to_add + db->db_sw_list_size;
 
2980
#else
 
2981
                sweep_count = db->db_xn_curr_id + 1 - db->db_xn_to_clean_id;
 
2982
#endif
 
2983
                if (!db->db_xn_long_running_count && 
 
2984
                        curr_writer_total == db->db_xn_total_writer_count && 
 
2985
                        !sweep_count &&
 
2986
                        db->db_sw_idle == XT_THREAD_IDLE && 
 
2987
                        db->db_wr_idle == XT_THREAD_IDLE) {
 
2988
                        /* No activity in 2 seconds: */
 
2989
                        if (async)
 
2990
                                xres_cp_async_checkpoint(self, db, curr_writer_total, TRUE);
 
2991
                        else
 
2992
                                xres_cp_checkpoint(self, db, curr_writer_total, FALSE);
 
2993
                }
 
2994
                else {
 
2995
                        /* There server is busy, check if we need to
 
2996
                         * write a checkpoint anyway...
 
2997
                         */
 
2998
                        if (db->db_restart.xres_cp_required ||
 
2999
                                db->db_restart.xres_is_checkpoint_pending(db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset)) {
 
3000
                                /* Flush tables, until the checkpoint is complete. */
 
3001
                                if (db->db_restart.xres_is_checkpoint_pending(db->db_wr_log_id, db->db_wr_log_offset)) {
 
3002
                                        /* Wait until the writer has done enough for the checkpoint: */
 
3003
                                        if (async)
 
3004
                                                xres_cp_async_checkpoint(self, db, curr_writer_total, TRUE);
 
3005
                                        else
 
3006
                                                xres_cp_checkpoint(self, db, curr_writer_total, TRUE);
 
3007
                                }
 
3008
                                else {
 
3009
                                        /* Wake the writer if necessary: */
 
3010
                                        if (db->db_wr_idle == XT_THREAD_IDLE) {
 
3011
                                                if (!xt_broadcast_cond_ns(&db->db_wr_cond))
 
3012
                                                        xt_log_and_clear_exception_ns();
 
3013
                                        }
 
3014
                                }
 
3015
                        }
 
3016
                }
 
3017
 
 
3018
                if (curr_writer_total == db->db_xn_total_writer_count) {
 
3019
                        /* We did a checkpoint, and still, nothing has
 
3020
                         * happened....
 
3021
                         *
 
3022
                         * Wait for something to happen:
 
3023
                         */
 
3024
                        xtLogID         log_id;
 
3025
                        xtLogOffset     log_offset;
 
3026
 
 
3027
                        xt_db_approximate_time = time(NULL);
 
3028
                        while (!self->t_quit && curr_writer_total == db->db_xn_total_writer_count) {
 
3029
                                /* The writer position: */
 
3030
                                xt_lock_mutex(self, &db->db_wr_lock);
 
3031
                                pushr_(xt_unlock_mutex, &db->db_wr_lock);
 
3032
                                log_id = db->db_wr_log_id;
 
3033
                                log_offset = db->db_wr_log_offset;
 
3034
                                freer_(); // xt_unlock_mutex(&db->db_wr_lock)
 
3035
 
 
3036
                                /* This condition means we could checkpoint: */
 
3037
                                if (!(xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
 
3038
                                        xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
 
3039
                                        xt_comp_log_pos(log_id, log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0) &&
 
3040
                                        xt_sl_get_size(db->db_xn_xa_list) == 0)
 
3041
                                        break;
 
3042
 
 
3043
                                xres_cp_wait_for_log_writer(self, db, 400);
 
3044
                                xt_db_approximate_time = time(NULL);
 
3045
                                xt_db_free_unused_open_tables(self, db);
 
3046
                                xt_db_approximate_time = time(NULL);
 
3047
                                if (xt_db_approximate_time > last_index_flush + 2) {
 
3048
                                        last_index_flush = xt_db_approximate_time;
 
3049
                                        xres_flush_indices(self, db);
 
3050
                                }
 
3051
                        }
 
3052
                }
 
3053
        }
 
3054
}
 
3055
 
 
3056
static void *xres_cp_run_thread(XTThreadPtr self)
 
3057
{
 
3058
        XTDatabaseHPtr  db = (XTDatabaseHPtr) self->t_data;
 
3059
        int                             count;
 
3060
        void                    *mysql_thread;
 
3061
 
 
3062
        if (!(mysql_thread = myxt_create_thread()))
 
3063
                xt_throw(self);
 
3064
 
 
3065
        while (!self->t_quit) {
 
3066
                try_(a) {
 
3067
                        /*
 
3068
                         * The garbage collector requires that the database
 
3069
                         * is in use because.
 
3070
                         */
 
3071
                        xt_use_database(self, db, XT_FOR_CHECKPOINTER);
 
3072
 
 
3073
                        /* This action is both safe and required (see {BACKGROUND-RELEASE-DB}) */
 
3074
                        xt_heap_release(self, self->st_database);
 
3075
 
 
3076
                        xres_cp_main(self);
 
3077
                }
 
3078
                catch_(a) {
 
3079
                        /* This error is "normal"! */
 
3080
                        if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
 
3081
                                !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
 
3082
                                self->t_exception.e_sys_err == SIGTERM))
 
3083
                                xt_log_and_clear_exception(self);
 
3084
                }
 
3085
                cont_(a);
 
3086
 
 
3087
                /* Avoid releasing the database (done above) */
 
3088
                self->st_database = NULL;
 
3089
                xt_unuse_database(self, self);
 
3090
 
 
3091
                /* After an exception, pause before trying again... */
 
3092
                /* Number of seconds */
 
3093
                count = 60;
 
3094
                while (!self->t_quit && count > 0) {
 
3095
                        sleep(1);
 
3096
                        count--;
 
3097
                }
 
3098
        }
 
3099
 
 
3100
   /*
 
3101
        * {MYSQL-THREAD-KILL}
 
3102
        myxt_destroy_thread(mysql_thread, TRUE);
 
3103
        */
 
3104
        return NULL;
 
3105
}
 
3106
 
 
3107
static void xres_cp_free_thread(XTThreadPtr self, void *data)
 
3108
{
 
3109
        XTDatabaseHPtr db = (XTDatabaseHPtr) data;
 
3110
 
 
3111
        if (db->db_cp_thread) {
 
3112
                xt_lock_mutex(self, &db->db_cp_lock);
 
3113
                pushr_(xt_unlock_mutex, &db->db_cp_lock);
 
3114
                db->db_cp_thread = NULL;
 
3115
                freer_(); // xt_unlock_mutex(&db->db_cp_lock)
 
3116
        }
 
3117
}
 
3118
 
 
3119
/* Start a checkpoint, if none has been started. */
 
3120
xtPublic xtBool xt_begin_checkpoint(XTDatabaseHPtr db, xtBool have_table_lock, XTThreadPtr thread)
 
3121
{
 
3122
        XTCheckPointStatePtr    cp = &db->db_cp_state;
 
3123
        xtLogID                                 log_id;
 
3124
        xtLogOffset                             log_offset;
 
3125
        xtLogID                                 ind_rec_log_id;
 
3126
        xtLogOffset                             ind_rec_log_offset;
 
3127
        u_int                                   edx;
 
3128
        XTTableEntryPtr                 te_ptr;
 
3129
        XTTableHPtr                             tab;
 
3130
        XTOperationPtr                  op;
 
3131
        XTCheckPointTableRec    cpt;
 
3132
        XTSortedListPtr                 tables = NULL;
 
3133
        
 
3134
        /* during startup we can get an error before the checkpointer is inited */
 
3135
        if (!cp->cp_inited)
 
3136
                return FAILED;
 
3137
 
 
3138
        /* First check if a checkpoint is already running: */
 
3139
        xt_lock_mutex_ns(&cp->cp_state_lock);
 
3140
        if (cp->cp_running) {
 
3141
                xt_unlock_mutex_ns(&cp->cp_state_lock);
 
3142
                return OK;
 
3143
        }
 
3144
        if (cp->cp_table_ids) {
 
3145
                xt_free_sortedlist(NULL, cp->cp_table_ids);
 
3146
                cp->cp_table_ids = NULL;
 
3147
        }
 
3148
        xt_unlock_mutex_ns(&cp->cp_state_lock);
 
3149
        
 
3150
        /* Flush the log before we continue. This is to ensure that
 
3151
         * before we write a checkpoint, that the changes
 
3152
         * done by the sweeper and the compactor, have been
 
3153
         * applied.
 
3154
         *
 
3155
         * Note, the sweeper does not flush the log, so this is
 
3156
         * necessary!
 
3157
         *
 
3158
         * --- I have removed this flush. It is actually just a
 
3159
         * minor optimisation, which pushes the flush position
 
3160
         * below ahead.
 
3161
         *
 
3162
         * Note that the writer position used for the checkpoint
 
3163
         * _will_ be behind the current log flush position.
 
3164
         *
 
3165
         * This is because the writer cannot apply log changes
 
3166
         * until they are flushed.
 
3167
         */
 
3168
        /* This is an alternative to the above.
 
3169
        if (!xt_xlog_flush_log(db, self))
 
3170
                xt_throw(self);
 
3171
        */
 
3172
        xt_lock_mutex_ns(&db->db_wr_lock);
 
3173
 
 
3174
        /* The theoretical maximum restart log postion, is the
 
3175
         * position of the writer thread:
 
3176
         */
 
3177
        log_id = db->db_wr_log_id;
 
3178
        log_offset = db->db_wr_log_offset;
 
3179
 
 
3180
        ind_rec_log_id = db->db_xlog.xl_flush_log_id;
 
3181
        ind_rec_log_offset = db->db_xlog.xl_flush_log_offset;
 
3182
 
 
3183
        xt_unlock_mutex_ns(&db->db_wr_lock);
 
3184
 
 
3185
        /* Go through all the transactions, and find
 
3186
         * the lowest log start position of all the transactions.
 
3187
         */
 
3188
        for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
 
3189
                XTXactSegPtr    seg;
 
3190
 
 
3191
                seg = &db->db_xn_idx[i];
 
3192
                XT_XACT_READ_LOCK(&seg->xs_tab_lock, self);
 
3193
                for (u_int j=0; j<XT_XN_HASH_TABLE_SIZE; j++) {
 
3194
                        XTXactDataPtr   xact;
 
3195
                        
 
3196
                        xact = seg->xs_table[j];
 
3197
                        while (xact) {
 
3198
                                /* If the transaction is logged, but not cleaned: */
 
3199
                                if ((xact->xd_flags & (XT_XN_XAC_LOGGED | XT_XN_XAC_CLEANED)) == XT_XN_XAC_LOGGED) {
 
3200
                                        if (xt_comp_log_pos(log_id, log_offset, xact->xd_begin_log, xact->xd_begin_offset) > 0) {
 
3201
                                                log_id = xact->xd_begin_log;
 
3202
                                                log_offset = xact->xd_begin_offset;
 
3203
                                        }
 
3204
                                }
 
3205
                                xact = xact->xd_next_xact;
 
3206
                        }
 
3207
                }
 
3208
                XT_XACT_UNLOCK(&seg->xs_tab_lock, self, FALSE);
 
3209
        }
 
3210
 
 
3211
#ifdef TRACE_CHECKPOINT
 
3212
        printf("BEGIN CHECKPOINT %d-%llu\n", (int) log_id, (u_llong) log_offset);
 
3213
#endif
 
3214
        /* Go through all tables, and find the lowest log position.
 
3215
         * The log position stored by each table shows the position of
 
3216
         * the next operation that still needs to be applied.
 
3217
         *
 
3218
         * This comes from the list of operations which are
 
3219
         * queued for the table.
 
3220
         *
 
3221
         * This function also builds a list of tables!
 
3222
         */
 
3223
 
 
3224
        if (!(tables = xt_new_sortedlist_ns(sizeof(XTCheckPointTableRec), 20, xres_comp_flush_tabs, NULL, NULL)))
 
3225
                return FAILED;
 
3226
 
 
3227
        xt_enum_tables_init(&edx);
 
3228
        if (!have_table_lock)
 
3229
                xt_ht_lock(NULL, db->db_tables);
 
3230
        while ((te_ptr = xt_enum_tables_next(NULL, db, &edx))) {
 
3231
                if ((tab = te_ptr->te_table)) {
 
3232
                        xt_sl_lock_ns(tab->tab_op_list, thread);
 
3233
                        if ((op = (XTOperationPtr) xt_sl_first_item(tab->tab_op_list))) {
 
3234
                                if (xt_comp_log_pos(log_id, log_offset, op->or_log_id, op->or_log_offset) > 0) {
 
3235
                                        log_id = op->or_log_id;
 
3236
                                        log_offset = op->or_log_offset;
 
3237
                                }
 
3238
                        }
 
3239
                        xt_sl_unlock(NULL, tab->tab_op_list);
 
3240
                        cpt.cpt_flushed = 0;
 
3241
                        cpt.cpt_tab_id = tab->tab_id;
 
3242
#ifdef TRACE_CHECKPOINT
 
3243
                        printf("CHECKPOINT will flush: %d %s\n", (int) tab->tab_id, tab->tab_name->ps_path);
 
3244
#endif
 
3245
                        if (!xt_sl_insert(NULL, tables, &tab->tab_id, &cpt)) {
 
3246
                                if (!have_table_lock)
 
3247
                                        xt_ht_unlock(NULL, db->db_tables);
 
3248
                                xt_free_sortedlist(NULL, tables);
 
3249
                                return FAILED;
 
3250
                        }
 
3251
                }
 
3252
        }
 
3253
        if (!have_table_lock)
 
3254
                xt_ht_unlock(NULL, db->db_tables);
 
3255
 
 
3256
        xt_lock_mutex_ns(&cp->cp_state_lock);
 
3257
        /* If there is a table list, then someone was faster than me! */
 
3258
        /* NOTE: log_offset can be zero, and valid!
 
3259
         * So we only check the log_id.
 
3260
         */
 
3261
        if (!cp->cp_running && log_id) {
 
3262
                cp->cp_running = TRUE;
 
3263
                cp->cp_log_id = log_id;
 
3264
                cp->cp_log_offset = log_offset;
 
3265
 
 
3266
                cp->cp_ind_rec_log_id = ind_rec_log_id;
 
3267
                cp->cp_ind_rec_log_offset = ind_rec_log_offset;
 
3268
 
 
3269
                cp->cp_flush_count = 0;
 
3270
                cp->cp_next_to_flush = 0;
 
3271
                cp->cp_table_ids = tables;
 
3272
        }
 
3273
        else
 
3274
                xt_free_sortedlist(NULL, tables);
 
3275
        xt_unlock_mutex_ns(&cp->cp_state_lock);
 
3276
 
 
3277
        /* At this point, log flushing can begin... */
 
3278
        return OK;
 
3279
}
 
3280
 
 
3281
/*
 
3282
 * This function returns TRUE if there is still something to flush according to the current
 
3283
 * checkpoint.
 
3284
 *
 
3285
 * If so, it returns the table ID and a bit indicating whether the table data, or the
 
3286
 * index should be flushed.
 
3287
 *
 
3288
 * Note, the function also checks if an asynchronous flush is in progress. It will not return
 
3289
 * a reference to a flush that is already in progress.
 
3290
 */
 
3291
static xtBool xres_get_next_to_flush(XTDatabaseHPtr db, xtTableID *tab_id, int *flush_bit, xtBool skip_busy, xtBool rec_first)
 
3292
{
 
3293
        XTCheckPointStatePtr    cp = &db->db_cp_state;
 
3294
        XTCheckPointTablePtr    cp_tab;
 
3295
        size_t                                  table_count, tab_idx;
 
3296
        xtBool                                  flush_index;
 
3297
 
 
3298
        xt_lock_mutex_ns(&cp->cp_state_lock);
 
3299
        if (!cp->cp_running || !cp->cp_table_ids)
 
3300
                goto flush_complete;
 
3301
 
 
3302
        if (!(table_count = xt_sl_get_size(cp->cp_table_ids)))
 
3303
                goto flush_complete;
 
3304
 
 
3305
        ASSERT_NS(cp->cp_flush_count <= table_count);
 
3306
        if (cp->cp_flush_count >= table_count)
 
3307
                goto flush_complete;
 
3308
        
 
3309
        /* Try all tables: */
 
3310
        for (size_t i=0; i<table_count; i++) {
 
3311
                if (cp->cp_next_to_flush > table_count*2) {
 
3312
                        cp->cp_next_to_flush = 0;
 
3313
                        if (!skip_busy) {
 
3314
                                /* Output a dummy no table, to indicate we are wrapping! */
 
3315
                                *tab_id = 0;
 
3316
                                goto flush_table;
 
3317
                        }
 
3318
                }
 
3319
 
 
3320
                if (rec_first) {
 
3321
                        /* First all record/row files are flushed, then
 
3322
                         * the index files.
 
3323
                         */
 
3324
                        if (cp->cp_next_to_flush < table_count) {
 
3325
                                tab_idx = cp->cp_next_to_flush;
 
3326
                                flush_index = FALSE; 
 
3327
                        }
 
3328
                        else {
 
3329
                                tab_idx = cp->cp_next_to_flush - table_count;
 
3330
                                flush_index = TRUE; 
 
3331
                        }
 
3332
                }
 
3333
                else {
 
3334
                        /* Flush each table, one after the other,
 
3335
                         * record/row file followed by index file:
 
3336
                         */
 
3337
                        tab_idx = cp->cp_next_to_flush / 2;
 
3338
                        flush_index = (cp->cp_next_to_flush % 2) != 0;
 
3339
                }
 
3340
 
 
3341
                cp->cp_next_to_flush++;
 
3342
 
 
3343
                if ((cp_tab = (XTCheckPointTablePtr) xt_sl_item_at(cp->cp_table_ids, tab_idx))) {
 
3344
                        *tab_id = cp_tab->cpt_tab_id;
 
3345
                        if (flush_index) {
 
3346
                                if (!(cp_tab->cpt_flushed & XT_CPT_INDEX_FLUSHED)) {
 
3347
                                        if (!skip_busy || !(cp_tab->cpt_flushed & XT_CPT_INDEX_FLUSHING)) {
 
3348
                                                *flush_bit = XT_CPT_INDEX_FLUSHED;
 
3349
                                                goto flush_table;
 
3350
                                        }
 
3351
                                }
 
3352
                        }
 
3353
                        else {
 
3354
                                if (!(cp_tab->cpt_flushed & XT_CPT_REC_ROW_FLUSHED)) {
 
3355
                                        if (!skip_busy || !(cp_tab->cpt_flushed & XT_CPT_REC_ROW_FLUSHING)) {
 
3356
                                                *flush_bit = XT_CPT_REC_ROW_FLUSHED;
 
3357
                                                goto flush_table;
 
3358
                                        }
 
3359
                                }
 
3360
                        }
 
3361
                }
 
3362
        }
 
3363
 
 
3364
        flush_complete:
 
3365
        xt_unlock_mutex_ns(&cp->cp_state_lock);
 
3366
        return FALSE;
 
3367
 
 
3368
        flush_table:
 
3369
        xt_unlock_mutex_ns(&cp->cp_state_lock);
 
3370
        return TRUE;
 
3371
}
 
3372
 
 
3373
xtPublic void xt_checkpoint_set_flush_state(XTDatabaseHPtr db, xtTableID tab_id, int state)
 
3374
{
 
3375
        XTCheckPointStatePtr    cp = &db->db_cp_state;
 
3376
        XTCheckPointTablePtr    cp_tab;
 
3377
        int                                             flush_bit = 0;
 
3378
 
 
3379
        xt_lock_mutex_ns(&cp->cp_state_lock);
 
3380
        if (cp->cp_running) {
 
3381
                cp_tab = (XTCheckPointTablePtr) xt_sl_find(NULL, cp->cp_table_ids, &tab_id);
 
3382
                if (cp_tab) {
 
3383
                        switch (state) {
 
3384
                                case XT_CPT_STATE_START_REC_ROW:
 
3385
                                        cp_tab->cpt_flushed |= XT_CPT_REC_ROW_FLUSHING;
 
3386
                                        break;
 
3387
                                case XT_CPT_STATE_STOP_REC_ROW:
 
3388
                                        cp_tab->cpt_flushed &= ~XT_CPT_REC_ROW_FLUSHING;
 
3389
                                        break;
 
3390
                                case XT_CPT_STATE_DONE_REC_ROW:
 
3391
                                        cp_tab->cpt_flushed &= ~XT_CPT_REC_ROW_FLUSHING;
 
3392
                                        flush_bit = XT_CPT_REC_ROW_FLUSHED;
 
3393
                                        break;
 
3394
                                case XT_CPT_STATE_START_INDEX:
 
3395
                                        cp_tab->cpt_flushed |= XT_CPT_INDEX_FLUSHING;
 
3396
                                        break;
 
3397
                                case XT_CPT_STATE_STOP_INDEX:
 
3398
                                        cp_tab->cpt_flushed &= ~XT_CPT_INDEX_FLUSHING;
 
3399
                                        break;
 
3400
                                case XT_CPT_STATE_DONE_INDEX:
 
3401
                                        cp_tab->cpt_flushed &= ~XT_CPT_INDEX_FLUSHING;
 
3402
                                        flush_bit = XT_CPT_INDEX_FLUSHED;
 
3403
                                        break;
 
3404
                                case XT_CPT_STATE_DONE_ALL:
 
3405
                                        cp_tab->cpt_flushed &= ~(XT_CPT_REC_ROW_FLUSHING | XT_CPT_INDEX_FLUSHING);
 
3406
                                        flush_bit = XT_CPT_REC_ROW_FLUSHED | XT_CPT_INDEX_FLUSHED;
 
3407
                                        break;
 
3408
                        }
 
3409
 
 
3410
                        if (flush_bit && ((cp_tab->cpt_flushed & XT_CPT_ALL_FLUSHED) != XT_CPT_ALL_FLUSHED)) {
 
3411
                                cp_tab->cpt_flushed |= flush_bit;
 
3412
                                if ((cp_tab->cpt_flushed & XT_CPT_ALL_FLUSHED) == XT_CPT_ALL_FLUSHED) {
 
3413
                                        ASSERT_NS(cp->cp_flush_count < xt_sl_get_size(cp->cp_table_ids));
 
3414
                                        cp->cp_flush_count++;
 
3415
                                }
 
3416
                        }
 
3417
                }
 
3418
        }
 
3419
        xt_unlock_mutex_ns(&cp->cp_state_lock);
 
3420
}
 
3421
 
 
3422
/* End a checkpoint, if a checkpoint has been started,
 
3423
 * and all checkpoint tables have been flushed
 
3424
 */
 
3425
xtPublic xtBool xt_end_checkpoint(XTDatabaseHPtr db, XTThreadPtr thread, xtBool *checkpoint_done)
 
3426
{
 
3427
        XTCheckPointStatePtr    cp = &db->db_cp_state;
 
3428
        XTXlogCheckpointDPtr    cp_buf = NULL;
 
3429
        char                                    path[PATH_MAX];
 
3430
        XTOpenFilePtr                   of;
 
3431
        u_int                                   table_count;
 
3432
        size_t                                  chk_size = 0; 
 
3433
        u_int                                   no_of_logs = 0; 
 
3434
 
 
3435
        /* As long as we have outstanding XA transactions, we may not checkpoint! */
 
3436
        if (xt_sl_get_size(db->db_xn_xa_list) > 0) {
 
3437
#ifdef DEBUG
 
3438
                printf("Checkpoint must wait\n");
 
3439
#endif
 
3440
                return OK;
 
3441
        }
 
3442
 
 
3443
#ifdef NEVER_CHECKPOINT
 
3444
        return OK;
 
3445
#endif
 
3446
        /* Lock the checkpoint state so that only on thread can do this! */
 
3447
        xt_lock_mutex_ns(&cp->cp_state_lock);
 
3448
        if (!cp->cp_running)
 
3449
                goto checkpoint_complete;
 
3450
 
 
3451
        table_count = 0;
 
3452
        if (cp->cp_table_ids)
 
3453
                table_count = xt_sl_get_size(cp->cp_table_ids);
 
3454
        if (cp->cp_flush_count < table_count) {
 
3455
                /* Checkpoint is not done, yet! */
 
3456
                xt_unlock_mutex_ns(&cp->cp_state_lock);
 
3457
                if (checkpoint_done)
 
3458
                        *checkpoint_done = FALSE;
 
3459
                return OK;
 
3460
        }
 
3461
 
 
3462
        /* Check if anything has changed since the last checkpoint,
 
3463
         * if not, there is no need to write a new checkpoint!
 
3464
         */
 
3465
        if (xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
 
3466
                xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
 
3467
                xt_comp_log_pos(cp->cp_log_id, cp->cp_log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0) {
 
3468
                /* A checkpoint is required if the size of the deleted
 
3469
                 * list is not zero. The reason is, I cannot remove the
 
3470
                 * logs from the deleted list BEFORE a checkpoint has been
 
3471
                 * done which does NOT include these logs.
 
3472
                 *
 
3473
                 * Even though the logs have already been deleted. They
 
3474
                 * remain on the deleted list to ensure that they are NOT
 
3475
                 * reused during this time, until the next checkpoint.
 
3476
                 *
 
3477
                 * This is done because if they are used, then on restart
 
3478
                 * they would be deleted!
 
3479
                 */
 
3480
#ifdef TRACE_CHECKPOINT
 
3481
                printf("--- END CHECKPOINT - no write\n");
 
3482
#endif
 
3483
                goto checkpoint_complete;
 
3484
        }
 
3485
 
 
3486
#ifdef TRACE_CHECKPOINT
 
3487
        printf("--- END CHECKPOINT - write start point\n");
 
3488
#endif
 
3489
        xt_lock_mutex_ns(&db->db_datalogs.dlc_lock);
 
3490
 
 
3491
        no_of_logs = xt_sl_get_size(db->db_datalogs.dlc_to_delete);
 
3492
        chk_size = offsetof(XTXlogCheckpointDRec, xcp_del_log) + no_of_logs * 2;
 
3493
        xtLogID *log_id_ptr;
 
3494
 
 
3495
        if (!(cp_buf = (XTXlogCheckpointDPtr) xt_malloc_ns(chk_size))) {
 
3496
                xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
 
3497
                goto failed_0;
 
3498
        }
 
3499
 
 
3500
        /* Increment the checkpoint number. This value is used if 2 checkpoint have the
 
3501
         * same log number. In this case checkpoints may differ in the log files
 
3502
         * that should be deleted. Here it is important to use the most recent
 
3503
         * log file!
 
3504
         */
 
3505
        db->db_restart.xres_cp_number++;
 
3506
 
 
3507
        /* Create the checkpoint record: */
 
3508
        XT_SET_DISK_4(cp_buf->xcp_head_size_4, chk_size);
 
3509
        XT_SET_DISK_2(cp_buf->xcp_version_2, XT_CHECKPOINT_VERSION);
 
3510
        XT_SET_DISK_6(cp_buf->xcp_chkpnt_no_6, db->db_restart.xres_cp_number);
 
3511
        XT_SET_DISK_4(cp_buf->xcp_log_id_4, cp->cp_log_id);
 
3512
        XT_SET_DISK_6(cp_buf->xcp_log_offs_6, cp->cp_log_offset);
 
3513
        XT_SET_DISK_4(cp_buf->xcp_tab_id_4, db->db_curr_tab_id);
 
3514
        XT_SET_DISK_4(cp_buf->xcp_xact_id_4, db->db_xn_curr_id);
 
3515
        XT_SET_DISK_4(cp_buf->xcp_ind_rec_log_id_4, cp->cp_ind_rec_log_id);
 
3516
        XT_SET_DISK_6(cp_buf->xcp_ind_rec_log_offs_6, cp->cp_ind_rec_log_offset);
 
3517
        XT_SET_DISK_2(cp_buf->xcp_log_count_2, no_of_logs);
 
3518
 
 
3519
        for (u_int i=0; i<no_of_logs; i++) {
 
3520
                log_id_ptr = (xtLogID *) xt_sl_item_at(db->db_datalogs.dlc_to_delete, i);
 
3521
                XT_SET_DISK_2(cp_buf->xcp_del_log[i], (xtWord2) *log_id_ptr);
 
3522
        }
 
3523
 
 
3524
        XT_SET_DISK_2(cp_buf->xcp_checksum_2, xt_get_checksum(((xtWord1 *) cp_buf) + 2, chk_size - 2, 1));
 
3525
 
 
3526
        xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
 
3527
 
 
3528
        /* Write the checkpoint: */
 
3529
        db->db_restart.xres_name(PATH_MAX, path, db->db_restart.xres_next_res_no);
 
3530
        if (!(of = xt_open_file_ns(path, XT_FT_STANDARD, XT_FS_CREATE | XT_FS_MAKE_PATH, 16*1024*1024)))
 
3531
                goto failed_1;
 
3532
 
 
3533
        if (!xt_set_eof_file(NULL, of, 0))
 
3534
                goto failed_2;
 
3535
        if (!xt_pwrite_file(of, 0, chk_size, (xtWord1 *) cp_buf, &thread->st_statistics.st_x, thread))
 
3536
                goto failed_2;
 
3537
        if (!xt_flush_file(of, &thread->st_statistics.st_x, thread))
 
3538
                goto failed_2;
 
3539
 
 
3540
        xt_close_file_ns(of);
 
3541
 
 
3542
        /* Next time write the other restart file: */
 
3543
        db->db_restart.xres_next_res_no = (db->db_restart.xres_next_res_no % 2) + 1;
 
3544
        db->db_restart.xres_cp_log_id = cp->cp_log_id;
 
3545
        db->db_restart.xres_cp_log_offset = cp->cp_log_offset;
 
3546
        db->db_restart.xres_cp_required = FALSE;
 
3547
 
 
3548
        /*
 
3549
         * Remove all the data logs that were deleted on the
 
3550
         * last checkpoint:
 
3551
         */
 
3552
        if (!xres_remove_data_logs(db))
 
3553
                goto failed_0;
 
3554
 
 
3555
#ifndef DEBUG_KEEP_LOGS
 
3556
        /* After checkpoint, we can delete transaction logs that will no longer be required
 
3557
         * for recovery...
 
3558
         */
 
3559
        if (cp->cp_log_id > 1) {
 
3560
                xtLogID current_log_id = cp->cp_log_id;
 
3561
                xtLogID del_log_id;
 
3562
 
 
3563
#ifdef XT_NUMBER_OF_LOGS_TO_SAVE
 
3564
                if (pbxt_crash_debug) {
 
3565
                        /* To save the logs, we just consider them in use: */
 
3566
                        if (current_log_id > XT_NUMBER_OF_LOGS_TO_SAVE)
 
3567
                                current_log_id -= XT_NUMBER_OF_LOGS_TO_SAVE;
 
3568
                        else
 
3569
                                current_log_id = 1;
 
3570
                }
 
3571
#endif
 
3572
 
 
3573
                del_log_id = current_log_id - 1;
 
3574
 
 
3575
                while (del_log_id > 0) {
 
3576
                        db->db_xlog.xlog_name(PATH_MAX, path, del_log_id);
 
3577
                        if (!xt_fs_exists(path))
 
3578
                                break;
 
3579
                        del_log_id--;
 
3580
                }
 
3581
 
 
3582
                /* This was the lowest log ID that existed: */
 
3583
                del_log_id++;
 
3584
 
 
3585
                /* Delete all logs that still exist, that come before
 
3586
                 * the current log:
 
3587
                 *
 
3588
                 * Do this from least to greatest to ensure no "holes" appear.
 
3589
                 */
 
3590
                while (del_log_id < current_log_id) {
 
3591
                        switch (db->db_xlog.xlog_delete_log(del_log_id, thread)) {
 
3592
                                case OK:
 
3593
                                        break;
 
3594
                                case FAILED:
 
3595
                                        goto exit_loop;
 
3596
                                case XT_ERR:
 
3597
                                        goto failed_0;
 
3598
                        }
 
3599
                        del_log_id++;
 
3600
                }
 
3601
                exit_loop:;
 
3602
        }
 
3603
 
 
3604
        /* And we can delete data logs in the list, and place them
 
3605
         * on the deleted list.
 
3606
         */
 
3607
        xtLogID log_id;
 
3608
        for (u_int i=0; i<no_of_logs; i++) {
 
3609
                log_id = (xtLogID) XT_GET_DISK_2(cp_buf->xcp_del_log[i]);
 
3610
                if (!xres_delete_data_log(db, log_id))
 
3611
                        goto failed_0;
 
3612
        }
 
3613
#endif
 
3614
 
 
3615
        xt_free_ns(cp_buf);
 
3616
        cp_buf = NULL;
 
3617
 
 
3618
        checkpoint_complete:
 
3619
        cp->cp_running = FALSE;
 
3620
        if (cp->cp_table_ids) {
 
3621
                xt_free_sortedlist(NULL, cp->cp_table_ids);
 
3622
                cp->cp_table_ids = NULL;
 
3623
        }
 
3624
        cp->cp_flush_count = 0;
 
3625
        cp->cp_next_to_flush = 0;
 
3626
        db->db_restart.xres_cp_required = FALSE;
 
3627
        xt_unlock_mutex_ns(&cp->cp_state_lock);
 
3628
        if (checkpoint_done)
 
3629
                *checkpoint_done = TRUE;
 
3630
        return OK;
 
3631
 
 
3632
        failed_2:
 
3633
        xt_close_file_ns(of);
 
3634
 
 
3635
        failed_1:
 
3636
        xt_free_ns(cp_buf);
 
3637
 
 
3638
        failed_0:
 
3639
        if (cp_buf)
 
3640
                xt_free_ns(cp_buf);
 
3641
        xt_unlock_mutex_ns(&cp->cp_state_lock);
 
3642
        return FAILED;
 
3643
}
 
3644
 
 
3645
xtPublic xtWord8 xt_bytes_since_last_checkpoint(XTDatabaseHPtr db, xtLogID curr_log_id, xtLogOffset curr_log_offset)
 
3646
{
 
3647
        xtLogID                                 log_id;
 
3648
        xtLogOffset                             log_offset;
 
3649
        size_t                                  byte_count = 0;
 
3650
 
 
3651
        log_id = db->db_restart.xres_cp_log_id;
 
3652
        log_offset = db->db_restart.xres_cp_log_offset;
 
3653
 
 
3654
        /* Assume the logs have the threshold: */
 
3655
        if (log_id < curr_log_id) {
 
3656
                if (log_offset < xt_db_log_file_threshold)
 
3657
                        byte_count = (size_t) (xt_db_log_file_threshold - log_offset);
 
3658
                log_offset = 0;
 
3659
                log_id++;
 
3660
        }
 
3661
        while (log_id < curr_log_id) {
 
3662
                byte_count += (size_t) xt_db_log_file_threshold;
 
3663
                log_id++;
 
3664
        }
 
3665
        if (log_offset < curr_log_offset)
 
3666
                byte_count += (size_t) (curr_log_offset - log_offset);
 
3667
 
 
3668
        return byte_count;
 
3669
}
 
3670
 
 
3671
xtPublic void xt_start_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
 
3672
{
 
3673
        char name[PATH_MAX];
 
3674
 
 
3675
        sprintf(name, "CP-%s", xt_last_directory_of_path(db->db_main_path));
 
3676
        xt_remove_dir_char(name);
 
3677
        db->db_cp_thread = xt_create_daemon(self, name);
 
3678
        xt_set_thread_data(db->db_cp_thread, db, xres_cp_free_thread);
 
3679
        xt_run_thread(self, db->db_cp_thread, xres_cp_run_thread);
 
3680
}
 
3681
 
 
3682
xtPublic void xt_wait_for_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
 
3683
{
 
3684
        time_t          then, now;
 
3685
        xtBool          message = FALSE;
 
3686
        xtLogID         log_id;
 
3687
        xtLogOffset     log_offset;
 
3688
 
 
3689
        if (db->db_cp_thread) {
 
3690
                then = time(NULL);
 
3691
                for (;;) {
 
3692
                        xt_lock_mutex(self, &db->db_wr_lock);
 
3693
                        pushr_(xt_unlock_mutex, &db->db_wr_lock);
 
3694
                        log_id = db->db_wr_log_id;
 
3695
                        log_offset = db->db_wr_log_offset;
 
3696
                        freer_(); // xt_unlock_mutex(&db->db_wr_lock)
 
3697
 
 
3698
                        if (xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
 
3699
                                xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
 
3700
                                xt_comp_log_pos(log_id, log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0)
 
3701
                                break;
 
3702
 
 
3703
                        /* Do a final checkpoint before shutdown: */
 
3704
                        db->db_restart.xres_cp_required = TRUE;
 
3705
 
 
3706
                        xt_lock_mutex(self, &db->db_cp_lock);
 
3707
                        pushr_(xt_unlock_mutex, &db->db_cp_lock);
 
3708
                        if (!xt_broadcast_cond_ns(&db->db_cp_cond)) {
 
3709
                                xt_log_and_clear_exception_ns();
 
3710
                                break;
 
3711
                        }
 
3712
                        freer_(); // xt_unlock_mutex(&db->db_cp_lock)
 
3713
 
 
3714
                        xt_sleep_milli_second(10);
 
3715
 
 
3716
                        now = time(NULL);
 
3717
                        if (now >= then + 16) {
 
3718
                                xt_logf(XT_NT_INFO, "Aborting wait for '%s' checkpointer\n", db->db_name);
 
3719
                                message = FALSE;
 
3720
                                break;
 
3721
                        }
 
3722
                        if (now >= then + 2) {
 
3723
                                if (!message) {
 
3724
                                        message = TRUE;
 
3725
                                        xt_logf(XT_NT_INFO, "Waiting for '%s' checkpointer...\n", db->db_name);
 
3726
                                }
 
3727
                        }
 
3728
                }
 
3729
 
 
3730
                if (message)
 
3731
                        xt_logf(XT_NT_INFO, "Checkpointer '%s' done.\n", db->db_name);
 
3732
        }
 
3733
}
 
3734
 
 
3735
xtPublic void xt_stop_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
 
3736
{
 
3737
        XTThreadPtr thr_wr;
 
3738
 
 
3739
        if (db->db_cp_thread) {
 
3740
                xt_lock_mutex(self, &db->db_cp_lock);
 
3741
                pushr_(xt_unlock_mutex, &db->db_cp_lock);
 
3742
 
 
3743
                /* This pointer is safe as long as you have the transaction lock. */
 
3744
                if ((thr_wr = db->db_cp_thread)) {
 
3745
                        xtThreadID tid = thr_wr->t_id;
 
3746
 
 
3747
                        /* Make sure the thread quits when woken up. */
 
3748
                        xt_terminate_thread(self, thr_wr);
 
3749
 
 
3750
                        xt_wake_checkpointer(self, db);
 
3751
 
 
3752
                        freer_(); // xt_unlock_mutex(&db->db_cp_lock)
 
3753
 
 
3754
                        /*
 
3755
                         * GOTCHA: This is a wierd thing but the SIGTERM directed
 
3756
                         * at a particular thread (in this case the sweeper) was
 
3757
                         * being caught by a different thread and killing the server
 
3758
                         * sometimes. Disconcerting.
 
3759
                         * (this may only be a problem on Mac OS X)
 
3760
                        xt_kill_thread(thread);
 
3761
                         */
 
3762
                        xt_wait_for_thread_to_exit(tid, FALSE);
 
3763
 
 
3764
                        /* PMC - This should not be necessary to set the signal here, but in the
 
3765
                         * debugger the handler is not called!!?
 
3766
                        thr_wr->t_delayed_signal = SIGTERM;
 
3767
                        xt_kill_thread(thread);
 
3768
                         */
 
3769
                        db->db_cp_thread = NULL;
 
3770
                }
 
3771
                else
 
3772
                        freer_(); // xt_unlock_mutex(&db->db_cp_lock)
 
3773
        }
 
3774
}
 
3775
 
 
3776
xtPublic void xt_wake_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
 
3777
{
 
3778
        if (!xt_broadcast_cond_ns(&db->db_cp_cond))
 
3779
                xt_log_and_clear_exception(self);
 
3780
}
 
3781
 
 
3782
xtPublic void xt_free_writer_state(struct XTThread *self, XTWriterStatePtr ws)
 
3783
{
 
3784
        if (ws->ws_db)
 
3785
                ws->ws_db->db_xlog.xlog_seq_exit(&ws->ws_seqread);
 
3786
        xt_db_set_size(self, &ws->ws_databuf, 0);
 
3787
        xt_ib_free(self, &ws->ws_rec_buf);
 
3788
        if (ws->ws_ot) {
 
3789
                xt_db_return_table_to_pool(self, ws->ws_ot);
 
3790
                ws->ws_ot = NULL;
 
3791
        }
 
3792
}
 
3793
 
 
3794
xtPublic void xt_dump_xlogs(XTDatabaseHPtr db, xtLogID start_log)
 
3795
{
 
3796
        XTXactSeqReadRec        seq;
 
3797
        XTXactLogBufferDPtr     record;
 
3798
        xtLogID                         log_id = db->db_restart.xres_cp_log_id;
 
3799
        char                            log_path[PATH_MAX];
 
3800
        XTThreadPtr                     thread = xt_get_self();
 
3801
 
 
3802
        /* Find the first log that still exists:*/
 
3803
        for (;;) {
 
3804
                log_id--;
 
3805
                db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
 
3806
                if (!xt_fs_exists(log_path))
 
3807
                        break;
 
3808
        }
 
3809
        log_id++;
 
3810
 
 
3811
        if (!db->db_xlog.xlog_seq_init(&seq, xt_db_log_buffer_size, FALSE))
 
3812
                return;
 
3813
 
 
3814
        if (log_id < start_log)
 
3815
                log_id = start_log;
 
3816
 
 
3817
        for (;;) {
 
3818
                db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
 
3819
                if (!xt_fs_exists(log_path))
 
3820
                        break;
 
3821
 
 
3822
                if (!db->db_xlog.xlog_seq_start(&seq, log_id, 0, FALSE))
 
3823
                        goto done;
 
3824
 
 
3825
                PRINTF("---------- DUMP LOG %d\n", (int) log_id);
 
3826
                for (;;) {
 
3827
                        if (!db->db_xlog.xlog_seq_next(&seq, &record, TRUE, thread)) {
 
3828
                                PRINTF("---------- DUMP LOG %d ERROR\n", (int) log_id);
 
3829
                                xt_log_and_clear_exception_ns();
 
3830
                                break;
 
3831
                        }
 
3832
                        if (!record) {
 
3833
                                PRINTF("---------- DUMP LOG %d DONE\n", (int) log_id);
 
3834
                                break;
 
3835
                        }
 
3836
                        xt_print_log_record(seq.xseq_rec_log_id, seq.xseq_rec_log_offset, record);
 
3837
                }
 
3838
 
 
3839
                log_id++;
 
3840
        }
 
3841
 
 
3842
        done:
 
3843
        db->db_xlog.xlog_seq_exit(&seq);
 
3844
}
 
3845
 
 
3846
/* ----------------------------------------------------------------------
 
3847
 * D A T A B A S E   R E C O V E R Y   T H R E A D
 
3848
 */
 
3849
 
 
3850
 
 
3851
static XTThreadPtr              xres_recovery_thread;
 
3852
 
 
3853
static void *xn_xres_run_recovery_thread(XTThreadPtr self)
 
3854
{
 
3855
        THD *mysql_thread;
 
3856
 
 
3857
        if (!(mysql_thread = (THD *) myxt_create_thread()))
 
3858
                xt_throw(self);
 
3859
 
 
3860
//#ifdef DRIZZLED
 
3861
//      static const std::string plugin_name("PBXT");
 
3862
//
 
3863
//      while (!xres_recovery_thread->t_quit && !Registry::singleton().find(plugin_name))
 
3864
//              xt_sleep_milli_second(1);
 
3865
//#else
 
3866
//      while (!xres_recovery_thread->t_quit && !ha_resolve_by_legacy_type(mysql_thread, DB_TYPE_PBXT))
 
3867
//              xt_sleep_milli_second(1);
 
3868
//#endif
 
3869
        myxt_wait_pbxt_plugin_slot_assigned(self);
 
3870
 
 
3871
        if (!xres_recovery_thread->t_quit) {
 
3872
                try_(a) {
 
3873
                        XTDatabaseHPtr  db;
 
3874
 
 
3875
                        /* {GLOBAL-DB}
 
3876
                         * It can happen that something will just get in before this
 
3877
                         * thread and open/recover the database!
 
3878
                         */
 
3879
                        if (!pbxt_database) {
 
3880
                                xt_open_database(self, mysql_real_data_home, TRUE);
 
3881
                                /* {GLOBAL-DB}
 
3882
                                 * This can be done at the same time as the recovery thread,
 
3883
                                 * strictly speaking I need a lock.
 
3884
                                 */
 
3885
                                if (!pbxt_database) {
 
3886
                                        pbxt_database = self->st_database;
 
3887
                                        xt_heap_reference(self, pbxt_database);
 
3888
                                }
 
3889
                        }
 
3890
                        else
 
3891
                                xt_use_database(self, pbxt_database, XT_FOR_USER);
 
3892
 
 
3893
                        pbxt_recovery_state = XT_RECOVER_DONE;
 
3894
 
 
3895
                        /* {WAIT-FOR-SW-AFTER-RECOV}
 
3896
                         * Moved to here...
 
3897
                         */
 
3898
                        db = self->st_database;
 
3899
                        xt_lock_mutex(self, &db->db_init_sweep_lock);
 
3900
                        pushr_(xt_unlock_mutex, &db->db_init_sweep_lock);
 
3901
                        if (!db->db_init_sweep_done) {
 
3902
                                XTTableEntryPtr         te_ptr;
 
3903
                                XTTableHPtr                     tab;
 
3904
                                
 
3905
                                xt_init_sweeper_wait(self, db);
 
3906
                                
 
3907
                                /* Memory tables are not recovered. This loop will
 
3908
                                 * make sure that the can be used, even though they are not recovered!
 
3909
                                 */
 
3910
                                for (u_int i=0; i<xt_sl_get_size(db->db_table_by_id); i++) {
 
3911
                                        te_ptr = (XTTableEntryPtr) xt_sl_item_at(db->db_table_by_id, i);
 
3912
                                        if ((tab = te_ptr->te_table)) {
 
3913
                                                if (tab->tab_dic.dic_tab_flags & XT_TF_MEMORY_TABLE) {
 
3914
                                                        tab->tab_recovery_not_done = FALSE;
 
3915
                                                        tab->tab_dic.dic_disable_index = XT_INDEX_OK;
 
3916
                                                }
 
3917
                                        }
 
3918
                                }
 
3919
                                
 
3920
                                db->db_init_sweep_done = TRUE;
 
3921
                        }
 
3922
                        freer_(); // xt_unlock_mutex(&db->db_init_sweep_lock)
 
3923
 
 
3924
                        pbxt_recovery_state = XT_RECOVER_SWEPT;
 
3925
                }
 
3926
                catch_(a) {
 
3927
                        pbxt_recovery_state = XT_RECOVER_ERROR;
 
3928
                        xt_log_and_clear_exception(self);
 
3929
                }
 
3930
                cont_(a);
 
3931
        }
 
3932
 
 
3933
   /*
 
3934
    * {MYSQL-THREAD-KILL}
 
3935
        * Here is the problem with destroying the thread at this
 
3936
        * point. If we had an error started, then it can lead
 
3937
        * to a callback into pbxt: pbxt_panic().
 
3938
        *
 
3939
        * This will shutdown things, making it impossible quite the
 
3940
        * thread and do a cleanup. Solution:
 
3941
        *
 
3942
        * Move the MySQL thread descruction to a later point!
 
3943
        *
 
3944
        * sql/mysqld --no-defaults --basedir=~/maria/trunk 
 
3945
        * --character-sets-dir=~/maria/trunk/sql/share/charsets 
 
3946
        * --language=~/maria/trunk/sql/share/english 
 
3947
        * --skip-networking --datadir=/tmp/x --skip-grant-tables --nonexistentoption 
 
3948
        *
 
3949
        * #0    0x003893f9 in xt_exit_databases at database_xt.cc:304
 
3950
        * #1    0x0039dc7e in pbxt_end at ha_pbxt.cc:947
 
3951
        * #2    0x0039dd27 in pbxt_panic at ha_pbxt.cc:1289
 
3952
        * #3    0x001d619e in ha_finalize_handlerton at handler.cc:391
 
3953
        * #4    0x00279d22 in plugin_deinitialize at sql_plugin.cc:816
 
3954
        * #5    0x0027bcf5 in reap_plugins at sql_plugin.cc:904
 
3955
        * #6    0x0027c38c in plugin_thdvar_cleanup at sql_plugin.cc:2513
 
3956
        * #7    0x000c0db2 in THD::~THD at sql_class.cc:934
 
3957
        * #8    0x003b025b in myxt_destroy_thread at myxt_xt.cc:2999
 
3958
        * #9    0x003b66b5 in xn_xres_run_recovery_thread at restart_xt.cc:3196
 
3959
        * #10   0x003cbfbb in xt_thread_main at thread_xt.cc:1020
 
3960
        *
 
3961
        myxt_destroy_thread(mysql_thread, TRUE);
 
3962
        */
 
3963
 
 
3964
        xres_recovery_thread = NULL;
 
3965
        return NULL;
 
3966
}
 
3967
 
 
3968
xtPublic void xt_xres_start_database_recovery(XTThreadPtr self)
 
3969
{
 
3970
        char name[PATH_MAX];
 
3971
 
 
3972
        sprintf(name, "DB-RECOVERY-%s", xt_last_directory_of_path(mysql_real_data_home));
 
3973
        xt_remove_dir_char(name);
 
3974
 
 
3975
        pbxt_recovery_state = XT_RECOVER_PENDING;
 
3976
        xres_recovery_thread = xt_create_daemon(self, name);
 
3977
        xt_run_thread(self, xres_recovery_thread, xn_xres_run_recovery_thread);
 
3978
}
 
3979
 
 
3980
xtPublic void xt_xres_terminate_recovery(XTThreadPtr self)
 
3981
{
 
3982
        XTThreadPtr thr_rec;
 
3983
 
 
3984
        /* {MYSQL-THREAD-KILL}
 
3985
         * Stack above shows that his is possible!
 
3986
         */
 
3987
        if ((thr_rec = xres_recovery_thread) && (self != xres_recovery_thread)) {
 
3988
                xtThreadID tid = thr_rec->t_id;
 
3989
 
 
3990
                xt_terminate_thread(self, thr_rec);
 
3991
 
 
3992
                xt_wait_for_thread_to_exit(tid, TRUE);
 
3993
        }
 
3994
}
 
3995
 
 
3996
/* ----------------------------------------------------------------------
 
3997
 * L O G   F L U S H    P R O C E S S
 
3998
 */
 
3999
 
 
4000
static void *xres_fl_run_thread(XTThreadPtr self)
 
4001
{
 
4002
        XTDatabaseHPtr  db = (XTDatabaseHPtr) self->t_data;
 
4003
        int                             count;
 
4004
        void                    *mysql_thread;
 
4005
        xtWord8                 to_flush;
 
4006
 
 
4007
        if (!(mysql_thread = myxt_create_thread()))
 
4008
                xt_throw(self);
 
4009
 
 
4010
        while (!self->t_quit) {
 
4011
                try_(a) {
 
4012
                        /*
 
4013
                         * The garbage collector requires that the database
 
4014
                         * is in use because.
 
4015
                         */
 
4016
                        xt_use_database(self, db, XT_FOR_CHECKPOINTER);
 
4017
 
 
4018
                        /* This action is both safe and required (see details elsewhere) */
 
4019
                        xt_heap_release(self, self->st_database);
 
4020
 
 
4021
                        xt_set_low_priority(self);
 
4022
 
 
4023
                        to_flush = xt_trace_clock() + XT_XLOG_FLUSH_FREQ * 1000;
 
4024
                        for (;;) {
 
4025
                                /* Wait 1 second: */
 
4026
                                while (!self->t_quit && xt_trace_clock() < to_flush)
 
4027
                                        xt_sleep_milli_second(10);
 
4028
 
 
4029
                                if (self->t_quit)
 
4030
                                        break;
 
4031
 
 
4032
                                if (!db->db_xlog.xlog_flush(self))
 
4033
                                        xt_throw(self);
 
4034
 
 
4035
                                to_flush += XT_XLOG_FLUSH_FREQ * 1000;
 
4036
                        }
 
4037
                }
 
4038
                catch_(a) {
 
4039
                        /* This error is "normal"! */
 
4040
                        if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
 
4041
                                !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
 
4042
                                self->t_exception.e_sys_err == SIGTERM))
 
4043
                                xt_log_and_clear_exception(self);
 
4044
                }
 
4045
                cont_(a);
 
4046
 
 
4047
                /* Avoid releasing the database (done above) */
 
4048
                self->st_database = NULL;
 
4049
                xt_unuse_database(self, self);
 
4050
 
 
4051
                /* After an exception, pause before trying again... */
 
4052
                /* Number of seconds */
 
4053
                count = 60;
 
4054
                while (!self->t_quit && count > 0) {
 
4055
                        sleep(1);
 
4056
                        count--;
 
4057
                }
 
4058
        }
 
4059
 
 
4060
   /*
 
4061
        * {MYSQL-THREAD-KILL}
 
4062
        myxt_destroy_thread(mysql_thread, TRUE);
 
4063
        */
 
4064
        return NULL;
 
4065
}
 
4066
 
 
4067
static void xres_fl_free_thread(XTThreadPtr self, void *data)
 
4068
{
 
4069
        XTDatabaseHPtr db = (XTDatabaseHPtr) data;
 
4070
 
 
4071
        if (db->db_fl_thread) {
 
4072
                xt_lock_mutex(self, &db->db_fl_lock);
 
4073
                pushr_(xt_unlock_mutex, &db->db_fl_lock);
 
4074
                db->db_fl_thread = NULL;
 
4075
                freer_(); // xt_unlock_mutex(&db->db_fl_lock)
 
4076
        }
 
4077
}
 
4078
 
 
4079
xtPublic void xt_start_flusher(XTThreadPtr self, XTDatabaseHPtr db)
 
4080
{
 
4081
        char name[PATH_MAX];
 
4082
 
 
4083
        sprintf(name, "FL-%s", xt_last_directory_of_path(db->db_main_path));
 
4084
        xt_remove_dir_char(name);
 
4085
        db->db_fl_thread = xt_create_daemon(self, name);
 
4086
        xt_set_thread_data(db->db_fl_thread, db, xres_fl_free_thread);
 
4087
        xt_run_thread(self, db->db_fl_thread, xres_fl_run_thread);
 
4088
}
 
4089
 
 
4090
xtPublic void xt_stop_flusher(XTThreadPtr self, XTDatabaseHPtr db)
 
4091
{
 
4092
        XTThreadPtr thr_fl;
 
4093
 
 
4094
        if (db->db_fl_thread) {
 
4095
                xt_lock_mutex(self, &db->db_fl_lock);
 
4096
                pushr_(xt_unlock_mutex, &db->db_fl_lock);
 
4097
 
 
4098
                /* This pointer is safe as long as you have the transaction lock. */
 
4099
                if ((thr_fl = db->db_fl_thread)) {
 
4100
                        xtThreadID tid = thr_fl->t_id;
 
4101
 
 
4102
                        /* Make sure the thread quits when woken up. */
 
4103
                        xt_terminate_thread(self, thr_fl);
 
4104
 
 
4105
                        freer_(); // xt_unlock_mutex(&db->db_cp_lock)
 
4106
 
 
4107
                        xt_wait_for_thread_to_exit(tid, FALSE);
 
4108
                        db->db_fl_thread = NULL;
 
4109
                }
 
4110
                else
 
4111
                        freer_(); // xt_unlock_mutex(&db->db_cp_lock)
 
4112
        }
 
4113
}
 
4114