~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/restart_xt.cc

  • Committer: Monty Taylor
  • Date: 2011-03-09 20:59:40 UTC
  • mfrom: (2226.1.14 build)
  • Revision ID: mordred@inaugust.com-20110309205940-7f5mk6zba2u7bawa
Merged Dave - Filtered Replication docs
Merged Olaf - Refactoring work
Removed archive, blackhole, filesystem_engine, blitzdb, csv and pbxt from
the tree pre-GA as we have no interest in supporting them moving forward.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2007 PrimeBase Technologies GmbH
2
 
 *
3
 
 * PrimeBase XT
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * 2007-11-12   Paul McCullagh
20
 
 *
21
 
 * H&G2JCtL
22
 
 *
23
 
 * Restart and write data to the database.
24
 
 */
25
 
 
26
 
#include "xt_config.h"
27
 
 
28
 
#include <signal.h>
29
 
#include <time.h>
30
 
 
31
 
#ifndef DRIZZLED
32
 
#include "mysql_priv.h"
33
 
#endif
34
 
 
35
 
#include "ha_pbxt.h"
36
 
 
37
 
#ifdef DRIZZLED
38
 
#include <drizzled/data_home.h>
39
 
using drizzled::module::Registry;
40
 
#endif
41
 
 
42
 
#include "xactlog_xt.h"
43
 
#include "database_xt.h"
44
 
#include "util_xt.h"
45
 
#include "strutil_xt.h"
46
 
#include "filesys_xt.h"
47
 
#include "myxt_xt.h"
48
 
#include "trace_xt.h"
49
 
 
50
 
static xtBool xres_get_next_to_flush(XTDatabaseHPtr db, xtTableID *tab_id, int *flush_bit, xtBool skip_busy, xtBool rec_first);
51
 
 
52
 
#ifdef DEBUG
53
 
//#define DEBUG_PRINT
54
 
//#define DEBUG_KEEP_LOGS
55
 
//#define PRINT_LOG_ON_RECOVERY
56
 
//#define TRACE_RECORD_DATA
57
 
//#define SKIP_STARTUP_CHECKPOINT
58
 
//#define NEVER_CHECKPOINT
59
 
//#define TRACE_CHECKPOINT
60
 
#endif
61
 
 
62
 
#define PRINTF          printf
63
 
//#define PRINTF                xt_ftracef
64
 
//#define PRINTF                xt_trace
65
 
 
66
 
/*
67
 
 * -----------------------------------------------------------------------
68
 
 * GLOBALS
69
 
 */
70
 
 
71
 
xtPublic int                            pbxt_recovery_state;
72
 
 
73
 
/*
74
 
 * -----------------------------------------------------------------------
75
 
 * UTILITIES
76
 
 */
77
 
 
78
 
#ifdef TRACE_RECORD_DATA
79
 
static void xt_print_bytes(xtWord1 *buf, u_int len)
80
 
{
81
 
        for (u_int i=0; i<len; i++) {
82
 
                PRINTF("%02x ", (u_int) *buf);
83
 
                buf++;
84
 
        }
85
 
}
86
 
#endif
87
 
 
88
 
void xt_print_log_record(xtLogID log, xtLogOffset offset, XTXactLogBufferDPtr record)
89
 
{
90
 
        const char              *type = NULL;
91
 
        const char              *rec_type = NULL;
92
 
        xtOpSeqNo               op_no = 0;
93
 
        xtTableID               tab_id = 0;
94
 
        xtRowID                 row_id = 0;
95
 
        xtRecordID              rec_id = 0;
96
 
        xtBool                  xn_set = FALSE;
97
 
        xtXactID                xn_id = 0;
98
 
        char                    buffer[200];
99
 
        XTTabRecExtDPtr rec_buf;
100
 
        XTTabRecExtDPtr ext_rec;
101
 
        XTTabRecFixDPtr fix_rec;
102
 
        u_int                   rec_len;
103
 
        xtLogID                 log_id = 0;
104
 
        xtLogOffset             log_offset = 0;
105
 
 
106
 
        rec_buf = NULL;
107
 
        ext_rec = NULL;
108
 
        fix_rec = NULL;
109
 
        rec_len = 0;
110
 
        switch (record->xl.xl_status_1) {
111
 
                case XT_LOG_ENT_REC_MODIFIED:
112
 
                case XT_LOG_ENT_UPDATE:
113
 
                case XT_LOG_ENT_INSERT:
114
 
                case XT_LOG_ENT_DELETE:
115
 
                case XT_LOG_ENT_UPDATE_BG:
116
 
                case XT_LOG_ENT_INSERT_BG:
117
 
                case XT_LOG_ENT_DELETE_BG:
118
 
                        op_no = XT_GET_DISK_4(record->xu.xu_op_seq_4);
119
 
                        tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
120
 
                        rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
121
 
                        xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
122
 
                        row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
123
 
                        rec_len = XT_GET_DISK_2(record->xu.xu_size_2);
124
 
                        xn_set = TRUE;
125
 
                        type="rec";
126
 
                        rec_buf = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;
127
 
                        ext_rec = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;
128
 
                        if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
129
 
                                log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
130
 
                                log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
131
 
                        }
132
 
                        else {
133
 
                                ext_rec = NULL;
134
 
                                fix_rec = (XTTabRecFixDPtr) &record->xu.xu_rec_type_1;
135
 
                        }
136
 
                        break;
137
 
                case XT_LOG_ENT_UPDATE_FL:
138
 
                case XT_LOG_ENT_INSERT_FL:
139
 
                case XT_LOG_ENT_DELETE_FL:
140
 
                case XT_LOG_ENT_UPDATE_FL_BG:
141
 
                case XT_LOG_ENT_INSERT_FL_BG:
142
 
                case XT_LOG_ENT_DELETE_FL_BG:
143
 
                        op_no = XT_GET_DISK_4(record->xf.xf_op_seq_4);
144
 
                        tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
145
 
                        rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
146
 
                        xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
147
 
                        row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
148
 
                        rec_len = XT_GET_DISK_2(record->xf.xf_size_2);
149
 
                        xn_set = TRUE;
150
 
                        type="rec";
151
 
                        rec_buf = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;
152
 
                        ext_rec = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;
153
 
                        if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
154
 
                                log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
155
 
                                log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
156
 
                        }
157
 
                        else {
158
 
                                ext_rec = NULL;
159
 
                                fix_rec = (XTTabRecFixDPtr) &record->xf.xf_rec_type_1;
160
 
                        }
161
 
                        break;
162
 
                case XT_DEFUNKT_REC_FREED:
163
 
                case XT_DEFUNKT_REC_REMOVED:
164
 
                case XT_DEFUNKT_REC_REMOVED_EXT:
165
 
                        op_no = XT_GET_DISK_4(record->fr.fr_op_seq_4);
166
 
                        tab_id = XT_GET_DISK_4(record->fr.fr_tab_id_4);
167
 
                        rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
168
 
                        xn_id = XT_GET_DISK_4(record->fr.fr_xact_id_4);
169
 
                        xn_set = TRUE;
170
 
                        type="rec";
171
 
                        break;
172
 
                case XT_LOG_ENT_REC_REMOVED_BI:
173
 
                        op_no = XT_GET_DISK_4(record->rb.rb_op_seq_4);
174
 
                        tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
175
 
                        rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
176
 
                        xn_id = XT_GET_DISK_4(record->rb.rb_xact_id_4);
177
 
                        row_id = XT_GET_DISK_4(record->rb.rb_row_id_4);
178
 
                        rec_len = XT_GET_DISK_2(record->rb.rb_size_2);
179
 
                        xn_set = TRUE;
180
 
                        type="rec";
181
 
                        rec_buf = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
182
 
                        ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
183
 
                        if (XT_REC_IS_EXT_DLOG(record->rb.rb_rec_type_1)) {
184
 
                                log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
185
 
                                log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
186
 
                        }
187
 
                        else {
188
 
                                ext_rec = NULL;
189
 
                                fix_rec = (XTTabRecFixDPtr) &record->rb.rb_rec_type_1;
190
 
                        }
191
 
                        break;
192
 
                case XT_LOG_ENT_REC_REMOVED_BI_L:
193
 
                        op_no = XT_GET_DISK_4(record->bl.bl_op_seq_4);
194
 
                        tab_id = XT_GET_DISK_4(record->bl.bl_tab_id_4);
195
 
                        rec_id = XT_GET_DISK_4(record->bl.bl_rec_id_4);
196
 
                        xn_id = XT_GET_DISK_4(record->bl.bl_xact_id_4);
197
 
                        row_id = XT_GET_DISK_4(record->bl.bl_row_id_4);
198
 
                        rec_len = XT_GET_DISK_2(record->bl.bl_size_2);
199
 
                        xn_set = TRUE;
200
 
                        type="rec";
201
 
                        rec_buf = (XTTabRecExtDPtr) &record->bl.bl_rec_type_1;
202
 
                        ext_rec = (XTTabRecExtDPtr) &record->bl.bl_rec_type_1;
203
 
                        if (XT_REC_IS_EXT_DLOG(record->bl.bl_rec_type_1)) {
204
 
                                log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
205
 
                                log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
206
 
                        }
207
 
                        else {
208
 
                                ext_rec = NULL;
209
 
                                fix_rec = (XTTabRecFixDPtr) &record->bl.bl_rec_type_1;
210
 
                        }
211
 
                        break;
212
 
                case XT_LOG_ENT_REC_MOVED:
213
 
                        op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);
214
 
                        tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
215
 
                        rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
216
 
                        log_id = XT_GET_DISK_2(&record->xw.xw_rec_type_1);                      // This is actually correct
217
 
                        log_offset = XT_GET_DISK_6(record->xw.xw_next_rec_id_4);        // This is actually correct!
218
 
                        type="rec";
219
 
                        break;
220
 
                case XT_LOG_ENT_REC_CLEANED:
221
 
                case XT_LOG_ENT_REC_CLEANED_1:
222
 
                case XT_LOG_ENT_REC_UNLINKED:
223
 
                        op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);
224
 
                        tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
225
 
                        rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
226
 
                        type="rec";
227
 
                        break;
228
 
                case XT_LOG_ENT_ROW_NEW:
229
 
                case XT_LOG_ENT_ROW_NEW_FL:
230
 
                case XT_LOG_ENT_ROW_ADD_REC:
231
 
                case XT_LOG_ENT_ROW_SET:
232
 
                case XT_LOG_ENT_ROW_FREED:
233
 
                        op_no = XT_GET_DISK_4(record->xa.xa_op_seq_4);
234
 
                        tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
235
 
                        rec_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
236
 
                        type="row";
237
 
                        break;
238
 
                case XT_LOG_ENT_NO_OP:
239
 
                        op_no = XT_GET_DISK_4(record->no.no_op_seq_4);
240
 
                        tab_id = XT_GET_DISK_4(record->no.no_tab_id_4);
241
 
                        type="-";
242
 
                        break;
243
 
                case XT_LOG_ENT_END_OF_LOG:
244
 
                        break;
245
 
        }
246
 
 
247
 
        switch (record->xl.xl_status_1) {
248
 
                case XT_LOG_ENT_HEADER:
249
 
                        rec_type = "HEADER";
250
 
                        break;
251
 
                case XT_LOG_ENT_NEW_LOG:
252
 
                        rec_type = "NEW LOG";
253
 
                        break;
254
 
                case XT_LOG_ENT_DEL_LOG:
255
 
                        sprintf(buffer, "DEL LOG log=%d ", (int) XT_GET_DISK_4(record->xl.xl_log_id_4));
256
 
                        rec_type = buffer;
257
 
                        break;
258
 
                case XT_LOG_ENT_NEW_TAB:
259
 
                        rec_type = "NEW TABLE";
260
 
                        tab_id = XT_GET_DISK_4(record->xt.xt_tab_id_4);
261
 
                        break;
262
 
                case XT_LOG_ENT_COMMIT:
263
 
                        rec_type = "COMMIT";
264
 
                        xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
265
 
                        xn_set = TRUE;
266
 
                        break;
267
 
                case XT_LOG_ENT_ABORT:
268
 
                        rec_type = "ABORT";
269
 
                        xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
270
 
                        xn_set = TRUE;
271
 
                        break;
272
 
                case XT_LOG_ENT_CLEANUP:
273
 
                        rec_type = "CLEANUP";
274
 
                        xn_id = XT_GET_DISK_4(record->xc.xc_xact_id_4);
275
 
                        xn_set = TRUE;
276
 
                        break;
277
 
                case XT_LOG_ENT_REC_MODIFIED:
278
 
                        rec_type = "MODIFIED";
279
 
                        break;
280
 
                case XT_LOG_ENT_UPDATE:
281
 
                        rec_type = "UPDATE";
282
 
                        break;
283
 
                case XT_LOG_ENT_UPDATE_FL:
284
 
                        rec_type = "UPDATE-FL";
285
 
                        break;
286
 
                case XT_LOG_ENT_INSERT:
287
 
                        rec_type = "INSERT";
288
 
                        break;
289
 
                case XT_LOG_ENT_INSERT_FL:
290
 
                        rec_type = "INSERT-FL";
291
 
                        break;
292
 
                case XT_LOG_ENT_DELETE:
293
 
                        rec_type = "DELETE";
294
 
                        break;
295
 
                case XT_LOG_ENT_DELETE_FL:
296
 
                        rec_type = "DELETE-FL";
297
 
                        break;
298
 
                case XT_LOG_ENT_UPDATE_BG:
299
 
                        rec_type = "UPDATE-BG";
300
 
                        break;
301
 
                case XT_LOG_ENT_UPDATE_FL_BG:
302
 
                        rec_type = "UPDATE-FL-BG";
303
 
                        break;
304
 
                case XT_LOG_ENT_INSERT_BG:
305
 
                        rec_type = "INSERT-BG";
306
 
                        break;
307
 
                case XT_LOG_ENT_INSERT_FL_BG:
308
 
                        rec_type = "INSERT-FL-BG";
309
 
                        break;
310
 
                case XT_LOG_ENT_DELETE_BG:
311
 
                        rec_type = "DELETE-BG";
312
 
                        break;
313
 
                case XT_LOG_ENT_DELETE_FL_BG:
314
 
                        rec_type = "DELETE-FL-BG";
315
 
                        break;
316
 
                case XT_DEFUNKT_REC_FREED:
317
 
                        rec_type = "FREE REC";
318
 
                        break;
319
 
                case XT_DEFUNKT_REC_REMOVED:
320
 
                        rec_type = "REMOVED REC";
321
 
                        break;
322
 
                case XT_DEFUNKT_REC_REMOVED_EXT:
323
 
                        rec_type = "REMOVED-X REC";
324
 
                        break;
325
 
                case XT_LOG_ENT_REC_REMOVED_BI:
326
 
                        rec_type = "REMOVED-BI REC";
327
 
                        break;
328
 
                case XT_LOG_ENT_REC_REMOVED_BI_L:
329
 
                        rec_type = "REMOVED-BI (L) REC";
330
 
                        break;
331
 
                case XT_LOG_ENT_REC_MOVED:
332
 
                        rec_type = "MOVED REC";
333
 
                        break;
334
 
                case XT_LOG_ENT_REC_CLEANED:
335
 
                        rec_type = "CLEAN REC";
336
 
                        break;
337
 
                case XT_LOG_ENT_REC_CLEANED_1:
338
 
                        rec_type = "CLEAN REC-1";
339
 
                        break;
340
 
                case XT_LOG_ENT_REC_UNLINKED:
341
 
                        rec_type = "UNLINK REC";
342
 
                        break;
343
 
                case XT_LOG_ENT_ROW_NEW:
344
 
                        rec_type = "NEW ROW";
345
 
                        break;
346
 
                case XT_LOG_ENT_ROW_NEW_FL:
347
 
                        rec_type = "NEW ROW-FL";
348
 
                        break;
349
 
                case XT_LOG_ENT_ROW_ADD_REC:
350
 
                        rec_type = "REC ADD ROW";
351
 
                        break;
352
 
                case XT_LOG_ENT_ROW_SET:
353
 
                        rec_type = "SET ROW";
354
 
                        break;
355
 
                case XT_LOG_ENT_ROW_FREED:
356
 
                        rec_type = "FREE ROW";
357
 
                        break;
358
 
                case XT_LOG_ENT_OP_SYNC:
359
 
                        rec_type = "OP SYNC";
360
 
                        break;
361
 
                case XT_LOG_ENT_NO_OP:
362
 
                        rec_type = "NO OP";
363
 
                        break;
364
 
                case XT_LOG_ENT_END_OF_LOG:
365
 
                        rec_type = "END OF LOG";
366
 
                        break;
367
 
                case XT_LOG_ENT_PREPARE:
368
 
                        rec_type = "PREPARE";
369
 
                        xn_id = XT_GET_DISK_4(record->xp.xp_xact_id_4);
370
 
                        xn_set = TRUE;
371
 
                        break;
372
 
        }
373
 
 
374
 
        if (log)
375
 
                PRINTF("log=%d offset=%d ", (int) log, (int) offset);
376
 
        PRINTF("%s ", rec_type);
377
 
        if (type)
378
 
                PRINTF("op=%lu tab=%lu %s=%lu ", (u_long) op_no, (u_long) tab_id, type, (u_long) rec_id);
379
 
        else if (tab_id)
380
 
                PRINTF("tab=%lu ", (u_long) tab_id);
381
 
        if (row_id)
382
 
                PRINTF("row=%lu ", (u_long) row_id);
383
 
        if (log_id)
384
 
                PRINTF("log=%lu offset=%lu ", (u_long) log_id, (u_long) log_offset);
385
 
        if (xn_set)
386
 
                PRINTF("xact=%lu ", (u_long) xn_id);
387
 
 
388
 
#ifdef TRACE_RECORD_DATA
389
 
        if (rec_buf) {
390
 
                switch (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_MASK) {
391
 
                        case XT_TAB_STATUS_FREED:
392
 
                                PRINTF("FREE");
393
 
                                break;
394
 
                        case XT_TAB_STATUS_DELETE:
395
 
                                PRINTF("DELE");
396
 
                                break;
397
 
                        case XT_TAB_STATUS_FIXED:
398
 
                                PRINTF("FIX-");
399
 
                                break;
400
 
                        case XT_TAB_STATUS_VARIABLE:
401
 
                                PRINTF("VAR-");
402
 
                                break;
403
 
                        case XT_TAB_STATUS_EXT_DLOG:
404
 
                                PRINTF("EXT-");
405
 
                                break;
406
 
                }
407
 
                if (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_CLEANED_BIT)
408
 
                        PRINTF("C");
409
 
                else
410
 
                        PRINTF(" ");
411
 
        }
412
 
        if (ext_rec) {
413
 
                rec_len -= offsetof(XTTabRecExtDRec, re_data);
414
 
                xt_print_bytes((xtWord1 *) ext_rec, offsetof(XTTabRecExtDRec, re_data));
415
 
                PRINTF("| ");
416
 
                if (rec_len > 20)
417
 
                        rec_len = 20;
418
 
                xt_print_bytes(ext_rec->re_data, rec_len);
419
 
        }
420
 
        if (fix_rec) {
421
 
                rec_len -= offsetof(XTTabRecFixDRec, rf_data);
422
 
                xt_print_bytes((xtWord1 *) fix_rec, offsetof(XTTabRecFixDRec, rf_data));
423
 
                PRINTF("| ");
424
 
                if (rec_len > 20)
425
 
                        rec_len = 20;
426
 
                xt_print_bytes(fix_rec->rf_data, rec_len);
427
 
        }
428
 
#endif
429
 
 
430
 
        PRINTF("\n");
431
 
}
432
 
 
433
 
#ifdef DEBUG_PRINT
434
 
void check_rows(void)
435
 
{
436
 
        static XTOpenFilePtr of = NULL;
437
 
 
438
 
        if (!of)
439
 
                of = xt_open_file_ns("./test/test_tab-1.xtr", XT_FS_DEFAULT);
440
 
        if (of) {
441
 
                size_t size = (size_t) xt_seek_eof_file(NULL, of);
442
 
                xtWord8 *buffer = (xtWord8 *) xt_malloc_ns(size);
443
 
                xt_pread_file(of, 0, size, size, buffer, NULL);
444
 
                for (size_t i=0; i<size/8; i++) {
445
 
                        if (!buffer[i])
446
 
                                printf("%d is NULL\n", (int) i);
447
 
                }
448
 
        }
449
 
}
450
 
 
451
 
#endif
452
 
 
453
 
/*
454
 
 * -----------------------------------------------------------------------
455
 
 * DELAYED WRITE FUNCTIONS
456
 
 */
457
 
 
458
 
#ifdef XT_SORT_REC_WRITES
459
 
 
460
 
xtPublic xtBool xt_xres_delay_flush(XTOpenTablePtr ot, xtBool lock)
461
 
{
462
 
        XTTableHPtr             tab = ot->ot_table;
463
 
        XTDelayWritePtr ptr;
464
 
        XTThreadPtr             thread = ot->ot_thread;
465
 
 
466
 
        if (!xt_sl_get_size(tab->tab_rec_dw_writes))
467
 
                return OK;
468
 
 
469
 
        if (lock)
470
 
                xt_sl_lock_ns(tab->tab_rec_dw_writes, ot->ot_thread);
471
 
        if ((ptr = (XTDelayWritePtr) xt_sl_item_at(tab->tab_rec_dw_writes, 0))) {
472
 
                u_int i, size;
473
 
 
474
 
                size = xt_sl_get_size(tab->tab_rec_dw_writes);
475
 
                i = 0;
476
 
                while (i<size) {
477
 
                        if (i+1 < size &&
478
 
                                ptr->dw_rec_id+1 == (ptr+1)->dw_rec_id &&
479
 
                                (ptr+1)->dw_offset == 0 &&
480
 
                                ptr->dw_data + tab->tab_dic.dic_rec_size - ptr->dw_offset == (ptr+1)->dw_data) {
481
 
                                XTDelayWritePtr start_ptr;
482
 
                                u_int                   start_i;
483
 
                                size_t                  tfer;
484
 
 
485
 
                                start_ptr = ptr;
486
 
                                start_i = i;
487
 
                                while (i+1 < size &&
488
 
                                        ptr->dw_rec_id+1 == (ptr+1)->dw_rec_id && 
489
 
                                        (ptr+1)->dw_offset == 0 &&
490
 
                                        ptr->dw_data + tab->tab_dic.dic_rec_size - ptr->dw_offset == (ptr+1)->dw_data) {
491
 
                                        ptr++;
492
 
                                        i++;
493
 
                                }
494
 
                                tfer = (ptr->dw_data + ptr->dw_size) - start_ptr->dw_data;
495
 
                                if (!xt_pwrite_file(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, start_ptr->dw_rec_id) + start_ptr->dw_offset, tfer, &tab->tab_rec_dw_data[start_ptr->dw_data], &thread->st_statistics.st_rec, thread))
496
 
                                        goto failed;
497
 
                        }
498
 
                        else {
499
 
                                if (!xt_pwrite_file(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, ptr->dw_rec_id) + ptr->dw_offset, ptr->dw_size, &tab->tab_rec_dw_data[ptr->dw_data], &thread->st_statistics.st_rec, thread))
500
 
                                        goto failed;
501
 
                        }
502
 
                        ptr++;
503
 
                        i++;
504
 
                }
505
 
        }
506
 
        xt_sl_set_size(tab->tab_rec_dw_writes, 0);
507
 
        tab->tab_rec_dw_data_usage = 0;
508
 
        tab->tab_head_op_seq = tab->tab_rec_dw_op_seq;
509
 
        if (lock)
510
 
                xt_sl_unlock_ns(tab->tab_rec_dw_writes);
511
 
        return OK;
512
 
 
513
 
        failed:
514
 
        if (lock)
515
 
                xt_sl_unlock_ns(tab->tab_rec_dw_writes);
516
 
        return FAILED;
517
 
}
518
 
 
519
 
xtPublic void xt_xres_flush_all(XTThreadPtr self, XTWriterStatePtr ws)
520
 
{
521
 
        xtTableID               tab_id;
522
 
        XTOpenTablePtr  ot;
523
 
 
524
 
        for (int i=0; i<XT_TABLE_LIST_SIZE; i++) {
525
 
                if ((tab_id = ws->ws_tab_flush_list[i])) {
526
 
                        if ((ot = ws->ws_ot) && ot->ot_table->tab_id == tab_id) {
527
 
                                if (!xt_xres_delay_flush(ot, TRUE))
528
 
                                        xt_throw(self);
529
 
                        }
530
 
                        else {
531
 
                                if ((ot = xt_db_open_pool_table(self, ws->ws_db, tab_id, NULL, TRUE))) {
532
 
                                        if (!xt_xres_delay_flush(ot, TRUE))
533
 
                                                xt_throw(self);
534
 
                                        xt_db_return_table_to_pool(self, ot);
535
 
                                }
536
 
                        }
537
 
                        ws->ws_tab_flush_list[i] = 0;
538
 
                }
539
 
        }
540
 
}
541
 
 
542
 
static void xres_table_to_flush(XTThreadPtr self, XTWriterStatePtr ws, xtTableID tab_id)
543
 
{
544
 
        u_int idx;
545
 
 
546
 
        retry:
547
 
        idx = tab_id % XT_TABLE_LIST_SIZE;
548
 
        if (ws->ws_tab_flush_list[idx] == tab_id)
549
 
                return;
550
 
        if (!ws->ws_tab_flush_list[idx])
551
 
                goto add_it;
552
 
        idx = (idx + XT_TABLE_LIST_INC) % XT_TABLE_LIST_SIZE;
553
 
        if (ws->ws_tab_flush_list[idx] == tab_id)
554
 
                return;
555
 
        if (!ws->ws_tab_flush_list[idx])
556
 
                goto add_it;
557
 
        idx = (idx + XT_TABLE_LIST_INC) % XT_TABLE_LIST_SIZE;
558
 
        if (ws->ws_tab_flush_list[idx] == tab_id)
559
 
                return;
560
 
        if (!ws->ws_tab_flush_list[idx])
561
 
                goto add_it;
562
 
        idx = (idx + XT_TABLE_LIST_INC) % XT_TABLE_LIST_SIZE;
563
 
        if (ws->ws_tab_flush_list[idx] == tab_id)
564
 
                return;
565
 
        if (!ws->ws_tab_flush_list[idx])
566
 
                goto add_it;
567
 
 
568
 
        /* After 4 tries we consider the table too full: */
569
 
        xt_xres_flush_all(self, ws);
570
 
        goto retry;
571
 
 
572
 
        add_it:
573
 
        ws->ws_tab_flush_list[idx] = tab_id;
574
 
}
575
 
 
576
 
static void xres_delay_write(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, size_t offs, size_t size, xtWord1 *data, xtBool in_sequence, XTWriterStatePtr ws)
577
 
{
578
 
        XTTableHPtr tab = ot->ot_table;
579
 
 
580
 
        ASSERT_NS(offs <= 0xFFFF);
581
 
        ASSERT_NS(size <= 0xFFFF);
582
 
        if (in_sequence && tab->tab_rec_dw_writes) {
583
 
                size_t                  idx;
584
 
                XTDelayWriteRec dw, *dw_ptr;
585
 
                size_t                  part_size;
586
 
 
587
 
                xt_sl_lock_ns(tab->tab_rec_dw_writes, ot->ot_thread);
588
 
 
589
 
                if (tab->tab_rec_dw_data_usage > XT_SORT_REC_MAX_BUF_SIZE) {
590
 
                        if (!xt_xres_delay_flush(ot, FALSE))
591
 
                                goto failed;
592
 
                }
593
 
 
594
 
                add_new_rec:
595
 
                if ((dw_ptr = (XTDelayWritePtr) xt_sl_search(self, &idx, tab->tab_rec_dw_writes, &rec_id))) {
596
 
                        /* Record has already been written: */
597
 
                        if (offs >= dw_ptr->dw_offset) {
598
 
                                if (offs + size <= dw_ptr->dw_offset + dw_ptr->dw_size)
599
 
                                        /* Completely in old block: */
600
 
                                        memcpy(tab->tab_rec_dw_data + dw_ptr->dw_data + (offs - dw_ptr->dw_offset), data, size);
601
 
                                else {
602
 
                                        if (offs > dw_ptr->dw_offset + dw_ptr->dw_size) {
603
 
                                                /* Records do not overlap (cannot handle this situation)! */
604
 
                                                if (!xt_xres_delay_flush(ot, FALSE))
605
 
                                                        goto failed;
606
 
                                                goto add_new_rec;
607
 
                                        }
608
 
 
609
 
                                        /* The current record overlaps, and follows on: */
610
 
                                        part_size = offs - dw_ptr->dw_offset;
611
 
                                        
612
 
                                        if (tab->tab_rec_dw_data_usage + part_size + size > tab->tab_rec_dw_data_size) {
613
 
                                                if (!xt_realloc_ns((void **) &tab->tab_rec_dw_data, tab->tab_rec_dw_data_usage + part_size + size))
614
 
                                                        goto failed;
615
 
                                                tab->tab_rec_dw_data_size = tab->tab_rec_dw_data_usage + part_size + size;
616
 
                                        }
617
 
                                        
618
 
                                        memcpy(tab->tab_rec_dw_data + tab->tab_rec_dw_data_usage, tab->tab_rec_dw_data + dw_ptr->dw_data, part_size);
619
 
                                        memcpy(tab->tab_rec_dw_data + tab->tab_rec_dw_data_usage + part_size, data, size);
620
 
                                        dw_ptr->dw_size = (xtWord2) (part_size + size);
621
 
                                        dw_ptr->dw_data = tab->tab_rec_dw_data_usage;
622
 
                                        tab->tab_rec_dw_data_usage += part_size + size;
623
 
                                }
624
 
                        }
625
 
                        else {
626
 
                                if (offs + size >= dw_ptr->dw_offset + dw_ptr->dw_size) {
627
 
                                        /* Completely covers old update (and is bigger): */
628
 
                                        if (tab->tab_rec_dw_data_usage + size > tab->tab_rec_dw_data_size) {
629
 
                                                if (!xt_realloc_ns((void **) &tab->tab_rec_dw_data, tab->tab_rec_dw_data_usage + size))
630
 
                                                        goto failed;
631
 
                                                tab->tab_rec_dw_data_size = tab->tab_rec_dw_data_usage + size;
632
 
                                        }
633
 
                                        memcpy(&tab->tab_rec_dw_data[tab->tab_rec_dw_data_usage], data, size);
634
 
                                        dw_ptr->dw_offset = (xtWord2) offs;
635
 
                                        dw_ptr->dw_size = (xtWord2) size;
636
 
                                        dw_ptr->dw_data = tab->tab_rec_dw_data_usage;
637
 
                                        tab->tab_rec_dw_data_usage += size;
638
 
                                }
639
 
                                else {
640
 
                                        if (offs + size < dw_ptr->dw_offset) {
641
 
                                                /* Records do not overlap (cannot handle this situation)! */
642
 
                                                if (!xt_xres_delay_flush(ot, FALSE))
643
 
                                                        goto failed;
644
 
                                                goto add_new_rec;
645
 
                                        }
646
 
 
647
 
                                        /* The part in the record we want to keep: */
648
 
                                        part_size = dw_ptr->dw_offset + dw_ptr->dw_size - (offs + size);
649
 
 
650
 
                                        if (tab->tab_rec_dw_data_usage + part_size + size > tab->tab_rec_dw_data_size) {
651
 
                                                if (!xt_realloc_ns((void **) &tab->tab_rec_dw_data, tab->tab_rec_dw_data_usage + part_size + size))
652
 
                                                        goto failed;
653
 
                                                tab->tab_rec_dw_data_size = tab->tab_rec_dw_data_usage + part_size + size;
654
 
                                        }
655
 
 
656
 
                                        memcpy(tab->tab_rec_dw_data + tab->tab_rec_dw_data_usage, data, size);
657
 
                                        memcpy(tab->tab_rec_dw_data + tab->tab_rec_dw_data_usage + size, tab->tab_rec_dw_data + dw_ptr->dw_data + dw_ptr->dw_size - part_size, part_size);
658
 
                                        dw_ptr->dw_offset = (xtWord2) offs;
659
 
                                        dw_ptr->dw_size = (xtWord2) (part_size + size);
660
 
                                        dw_ptr->dw_data = tab->tab_rec_dw_data_usage;
661
 
                                        tab->tab_rec_dw_data_usage += part_size + size;
662
 
                                }
663
 
                        }
664
 
                        
665
 
                        goto done_add;
666
 
                }
667
 
 
668
 
                /* New record added, make space for at least 3 records! */
669
 
                size_t inc_size;
670
 
 
671
 
                inc_size = tab->tab_dic.dic_rec_size * 3;
672
 
                ASSERT(size*3 <= inc_size);
673
 
                if (tab->tab_rec_dw_data_usage + inc_size > tab->tab_rec_dw_data_size) {
674
 
                        if (!xt_realloc_ns((void **) &tab->tab_rec_dw_data, tab->tab_rec_dw_data_usage + (inc_size * 4)))
675
 
                                goto failed;
676
 
                        tab->tab_rec_dw_data_size = tab->tab_rec_dw_data_usage + (inc_size * 4);
677
 
                }
678
 
 
679
 
                if (offs == 0) {
680
 
                        if (idx > 0) {
681
 
                                /* This record follows on after another */
682
 
                                dw_ptr = (XTDelayWritePtr) xt_sl_item_at(tab->tab_rec_dw_writes, idx-1);
683
 
                                
684
 
                                /* Check if the record before can be expanded! */
685
 
                                if (dw_ptr->dw_rec_id+1 == rec_id &&
686
 
                                        dw_ptr->dw_offset == 0 &&
687
 
                                        dw_ptr->dw_size >= XT_REC_EXT_HEADER_SIZE &&
688
 
                                        dw_ptr->dw_size < tab->tab_dic.dic_rec_size &&
689
 
                                        dw_ptr->dw_data + dw_ptr->dw_size == tab->tab_rec_dw_data_usage) {
690
 
                                        /* Round the size up... */
691
 
                                        tab->tab_rec_dw_data_usage = dw_ptr->dw_data + tab->tab_dic.dic_rec_size;
692
 
                                }
693
 
                        }
694
 
                }
695
 
 
696
 
                memcpy(&tab->tab_rec_dw_data[tab->tab_rec_dw_data_usage], data, size);
697
 
                dw.dw_rec_id = rec_id;
698
 
                dw.dw_offset = (xtWord2) offs;
699
 
                dw.dw_size = (xtWord2) size;
700
 
                dw.dw_data = tab->tab_rec_dw_data_usage;
701
 
                xt_sl_insert_item_at(self, tab->tab_rec_dw_writes, idx, &dw);
702
 
                tab->tab_rec_dw_data_usage += size;
703
 
 
704
 
                done_add:
705
 
                xt_sl_unlock_ns(tab->tab_rec_dw_writes);
706
 
 
707
 
                /* Add the table to the table list! */
708
 
                return xres_table_to_flush(self, ws, tab->tab_id);
709
 
        }
710
 
 
711
 
        if (xt_sl_get_size(tab->tab_rec_dw_writes)) {
712
 
                if (!xt_xres_delay_flush(ot, TRUE))
713
 
                        xt_throw(self);
714
 
        }
715
 
 
716
 
        if (!xt_tab_write_rec(ot, xt_rec_id_to_rec_offset(tab, rec_id) + offs, size, data))
717
 
                xt_throw(self);
718
 
        return;
719
 
 
720
 
        failed:
721
 
        xt_sl_unlock_ns(tab->tab_rec_dw_writes);
722
 
        xt_throw(self);
723
 
}
724
 
 
725
 
#define WRITE_REC_FILE(self, a, rec_id, off, c, d, e, ws)       xres_delay_write(self, a, rec_id, off, c, d, e, ws)
726
 
 
727
 
#else
728
 
 
729
 
static void xres_write_rec(XTThreadPtr self, XTOpenTablePtr ot, off_t offset, size_t size, xtWord1 *data)
730
 
{
731
 
        if (!xt_tab_write_rec(ot, offset, size, data))
732
 
                xt_throw(self);
733
 
}
734
 
 
735
 
#define WRITE_REC_FILE(self, a, rec_id, off, c, d, e, sw)       xres_write_rec(self, a, xt_rec_id_to_rec_offset(tab, rec_id) + (off), c, d)
736
 
 
737
 
#endif
738
 
 
739
 
/* ----------------------------------------------------------------------
740
 
 * APPLYING CHANGES IN SEQUENCE
741
 
 */
742
 
 
743
 
typedef struct XTOperation {
744
 
        xtOpSeqNo                               or_op_seq;
745
 
        xtWord4                                 or_op_len;
746
 
        xtLogID                                 or_log_id;
747
 
        xtLogOffset                             or_log_offset;
748
 
} XTOperationRec, *XTOperationPtr;
749
 
 
750
 
static int xres_cmp_op_seq(struct XTThread *XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
751
 
{
752
 
        xtOpSeqNo               lf_op_seq = *((xtOpSeqNo *) a);
753
 
        XTOperationPtr  lf_ptr = (XTOperationPtr) b;
754
 
 
755
 
        if (lf_op_seq == lf_ptr->or_op_seq)
756
 
                return 0;
757
 
        if (XTTableSeq::xt_op_is_before(lf_op_seq, lf_ptr->or_op_seq))
758
 
                return -1;
759
 
        return 1;
760
 
}
761
 
 
762
 
xtPublic void xt_xres_init_tab(XTThreadPtr self, XTTableHPtr tab)
763
 
{
764
 
        tab->tab_op_list = xt_new_sortedlist(self, sizeof(XTOperationRec), 20, 1000, xres_cmp_op_seq, NULL, NULL, TRUE, FALSE);
765
 
}
766
 
 
767
 
xtPublic void xt_xres_exit_tab(XTThreadPtr self, XTTableHPtr tab)
768
 
{
769
 
        if (tab->tab_op_list) {
770
 
                xt_free_sortedlist(self, tab->tab_op_list);
771
 
                tab->tab_op_list = NULL;
772
 
        }
773
 
}
774
 
 
775
 
static xtBool xres_open_table(XTThreadPtr self, XTWriterStatePtr ws, xtTableID tab_id)
776
 
{
777
 
        XTOpenTablePtr  ot;
778
 
 
779
 
        if ((ot = ws->ws_ot)) {
780
 
                if (ot->ot_table->tab_id == tab_id)
781
 
                        return OK;
782
 
                xt_db_return_table_to_pool(self, ot);
783
 
                ws->ws_ot = NULL;
784
 
        }
785
 
 
786
 
        /* Don't recover tables that cannot be found, or tables that 
787
 
         * temporary:
788
 
         */
789
 
        if (ws->ws_tab_gone == tab_id || ws->ws_tab_temp == tab_id)
790
 
                return FAILED;
791
 
 
792
 
        if ((ws->ws_ot = xt_db_open_pool_table(self, ws->ws_db, tab_id, NULL, TRUE))) {
793
 
                XTTableHPtr             tab;
794
 
 
795
 
                tab = ws->ws_ot->ot_table;
796
 
                
797
 
                /* Don't recover temporary tables! */
798
 
                if (ws->ws_in_recover && XT_IS_TEMP_TABLE(tab->tab_dic.dic_tab_flags)) {
799
 
                        tab->tab_recovery_not_done = TRUE;
800
 
                        tab->tab_dic.dic_disable_index = XT_INDEX_NOT_RECOVERED;
801
 
                        xt_db_return_table_to_pool(self, ws->ws_ot);
802
 
                        ws->ws_ot = NULL;
803
 
                        ws->ws_tab_temp = tab_id;
804
 
                        return FAILED;
805
 
                }
806
 
 
807
 
                if (!tab->tab_ind_rec_log_id) {
808
 
                        /* Should not happen... */
809
 
                        tab->tab_ind_rec_log_id = ws->ws_ind_rec_log_id;
810
 
                        tab->tab_ind_rec_log_offset = ws->ws_ind_rec_log_offset;
811
 
                }
812
 
                return OK;
813
 
        }
814
 
 
815
 
        ws->ws_tab_gone = tab_id;
816
 
        return FAILED;
817
 
}
818
 
 
819
 
/* {INDEX-RECOV_ROWID}
820
 
 * Add missing index entries during recovery.
821
 
 * Set the row ID even if the index entry
822
 
 * is not committed. It will be removed later by
823
 
 * the sweeper.
824
 
 */
825
 
static xtBool xres_add_index_entries(XTOpenTablePtr ot, xtRowID row_id, xtRecordID rec_id, xtWord1 *rec_data)
826
 
{
827
 
        XTTableHPtr                     tab = ot->ot_table;
828
 
        u_int                           idx_cnt;
829
 
        XTIndexPtr                      *ind;
830
 
        //XTIdxSearchKeyRec     key;
831
 
 
832
 
        if (tab->tab_dic.dic_disable_index)
833
 
                return OK;
834
 
 
835
 
        for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
836
 
                if (!xt_idx_insert(ot, *ind, row_id, rec_id, rec_data, NULL, TRUE)) {
837
 
                        /* Check the error, certain errors are recoverable! */
838
 
                        XTThreadPtr self = xt_get_self();
839
 
 
840
 
                        if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
841
 
                                (XT_FILE_IN_USE(self->t_exception.e_sys_err) ||
842
 
                                 XT_FILE_ACCESS_DENIED(self->t_exception.e_sys_err) ||
843
 
                                 XT_FILE_TOO_MANY_OPEN(self->t_exception.e_sys_err) ||
844
 
                                 self->t_exception.e_sys_err == XT_ENOMEM)) {
845
 
                                ot->ot_err_index_no = (*ind)->mi_index_no;
846
 
                                return FAILED;
847
 
                        }
848
 
 
849
 
                        /* TODO: Write something to the index header to indicate that
850
 
                         * it is corrupted.
851
 
                         */
852
 
                        xt_tab_disable_index(ot->ot_table, XT_INDEX_CORRUPTED);
853
 
                        xt_log_and_clear_exception_ns();
854
 
                        return OK;
855
 
                }
856
 
        }
857
 
        return OK;
858
 
}
859
 
 
860
 
static void xres_remove_index_entries(XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *rec_data)
861
 
{
862
 
        XTTableHPtr     tab = ot->ot_table;
863
 
        u_int           idx_cnt;
864
 
        XTIndexPtr      *ind;
865
 
 
866
 
        if (tab->tab_dic.dic_disable_index)
867
 
                return;
868
 
 
869
 
        for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
870
 
                if (!xt_idx_delete(ot, *ind, rec_id, rec_data))
871
 
                        xt_log_and_clear_exception_ns();
872
 
        }
873
 
}
874
 
 
875
 
static xtWord1 *xres_load_record(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *data, size_t red_size, XTInfoBufferPtr rec_buf, u_int cols_req)
876
 
{
877
 
        XTTableHPtr     tab = ot->ot_table;
878
 
        xtWord1         *rec_data;
879
 
 
880
 
        rec_data = ot->ot_row_rbuffer;
881
 
 
882
 
        ASSERT(red_size <= ot->ot_row_rbuf_size);
883
 
        ASSERT(tab->tab_dic.dic_rec_size <= ot->ot_row_rbuf_size);
884
 
        if (data) {
885
 
                if (rec_data != data)
886
 
                        memcpy(rec_data, data, red_size);
887
 
        }
888
 
        else {
889
 
                /* It can be that less than 'dic_rec_size' was written for
890
 
                 * variable length type records.
891
 
                 * If this is the last record in the file, then we will read
892
 
                 * less than actual record size.
893
 
                 */
894
 
                if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, rec_data, &red_size, &self->st_statistics.st_rec, self))
895
 
                        goto failed;
896
 
                
897
 
                if (red_size < sizeof(XTTabRecHeadDRec))
898
 
                        return NULL;
899
 
        }
900
 
        
901
 
        if (XT_REC_IS_FIXED(rec_data[0]))
902
 
                rec_data = ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE;
903
 
        else {
904
 
                if (!xt_ib_alloc(NULL, rec_buf, tab->tab_dic.dic_mysql_buf_size))
905
 
                        goto failed;
906
 
                if (XT_REC_IS_VARIABLE(rec_data[0])) {
907
 
                        if (!myxt_load_row(ot, rec_data + XT_REC_FIX_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
908
 
                                goto failed;
909
 
                }
910
 
                else if (XT_REC_IS_EXT_DLOG(rec_data[0])) {
911
 
                        if (red_size < XT_REC_EXT_HEADER_SIZE)
912
 
                                return NULL;
913
 
 
914
 
                        ASSERT(cols_req);
915
 
                        if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
916
 
                                if (!myxt_load_row(ot, rec_data + XT_REC_EXT_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
917
 
                                        goto failed;
918
 
                        }
919
 
                        else {
920
 
                                if (!xt_tab_load_ext_data(ot, rec_id, rec_buf->ib_db.db_data, cols_req))
921
 
                                        goto failed;
922
 
                        }
923
 
                }
924
 
                else
925
 
                        /* This is possible, the record has already been cleaned up. */
926
 
                        return NULL;
927
 
                rec_data = rec_buf->ib_db.db_data;
928
 
        }
929
 
 
930
 
        return rec_data;
931
 
 
932
 
        failed:
933
 
        /* Running out of memory should not be ignored. */
934
 
        if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
935
 
                self->t_exception.e_sys_err == XT_ENOMEM)
936
 
                xt_throw(self);
937
 
        xt_log_and_clear_exception_ns();
938
 
        return NULL;
939
 
}
940
 
 
941
 
/*
942
 
 * Apply a change from the log.
943
 
 *
944
 
 * This function is basically very straight forward, were it not
945
 
 * for the option to apply operations out of sequence.
946
 
 * (i.e. in_sequence == FALSE)
947
 
 *
948
 
 * If operations are applied in sequence, then they can be
949
 
 * applied blindly. The update operation is just executed as
950
 
 * it was logged.
951
 
 *
952
 
 * If the changes are not in sequence, then some operation are missing,
953
 
 * however, the operations that are present are in the correct order.
954
 
 *
955
 
 * This can only happen at the end of recovery!!!
956
 
 * After we have applied all operations in the log we may be
957
 
 * left with some operations that have not been applied
958
 
 * because operations were logged out of sequence.
959
 
 *
960
 
 * The application of these operations there has to take into
961
 
 * account the current state of the database.
962
 
 * They are then applied in a manner that maintains the
963
 
 * database consistency.
964
 
 *
965
 
 * For example, a record that is freed, is free by placing it
966
 
 * on the current free list. Part of the data logged for the
967
 
 * operation is ignored. Namely: the "next block" pointer
968
 
 * that was originally written into the freed record.
969
 
 */
970
 
static void xres_apply_change(XTThreadPtr self, XTWriterStatePtr ws, XTXactLogBufferDPtr record, xtBool in_sequence, xtBool check_index, xtOpSeqNo op_seq)
971
 
{
972
 
        XTOpenTablePtr          ot = ws->ws_ot;
973
 
        XTInfoBufferPtr         rec_buf = &ws->ws_rec_buf;
974
 
        XTTableHPtr                     tab = ot->ot_table;
975
 
        size_t                          len;
976
 
        xtRecordID                      rec_id;
977
 
        xtRefID                         free_ref_id;
978
 
        XTTabRecFreeDRec        free_rec;
979
 
        xtRowID                         row_id;
980
 
        XTTabRowRefDRec         row_buf;
981
 
        XTTabRecHeadDRec        rec_head;
982
 
        size_t                          tfer;
983
 
        xtRecordID                      link_rec_id, prev_link_rec_id;
984
 
        xtWord1                         *rec_data = NULL;
985
 
        XTTabRecFreeDPtr        free_data;
986
 
        u_int                           status;
987
 
 
988
 
        ASSERT(ot->ot_thread == self);
989
 
        if (tab->tab_dic.dic_key_count == 0)
990
 
                check_index = FALSE;
991
 
 
992
 
        status = record->xl.xl_status_1;
993
 
        switch (status) {
994
 
                case XT_LOG_ENT_REC_MODIFIED:
995
 
                case XT_LOG_ENT_UPDATE:
996
 
                case XT_LOG_ENT_INSERT:
997
 
                case XT_LOG_ENT_DELETE:
998
 
                case XT_LOG_ENT_UPDATE_BG:
999
 
                case XT_LOG_ENT_INSERT_BG:
1000
 
                case XT_LOG_ENT_DELETE_BG:
1001
 
                        rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
1002
 
 
1003
 
                        /* This should be done before we apply change to table, as otherwise we lose
1004
 
                         * the key value that we need to remove from index
1005
 
                         */
1006
 
                        if (check_index && status == XT_LOG_ENT_REC_MODIFIED) {
1007
 
                                if ((rec_data = xres_load_record(self, ot, rec_id, NULL, 0, rec_buf, tab->tab_dic.dic_ind_cols_req)))
1008
 
                                        xres_remove_index_entries(ot, rec_id, rec_data);                        
1009
 
                        }
1010
 
 
1011
 
                        len = (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
1012
 
                        WRITE_REC_FILE(self, ot, rec_id, 0, len, (xtWord1 *) &record->xu.xu_rec_type_1, in_sequence, ws);
1013
 
                        tab->tab_bytes_to_flush += len;
1014
 
 
1015
 
                        if (check_index) {
1016
 
                                switch (status) {
1017
 
                                        case XT_LOG_ENT_DELETE:
1018
 
                                        case XT_LOG_ENT_DELETE_BG:
1019
 
                                                break;
1020
 
                                        default:
1021
 
                                                if ((rec_data = xres_load_record(self, ot, rec_id, &record->xu.xu_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {
1022
 
                                                        row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
1023
 
                                                        if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))
1024
 
                                                                xt_throw(self);
1025
 
                                                }
1026
 
                                                break;
1027
 
                                }
1028
 
                        }
1029
 
 
1030
 
                        if (!in_sequence) {
1031
 
                                /* A record has been allocated from the EOF, but out of sequence.
1032
 
                                 * This could leave a gap where other records were allocated
1033
 
                                 * from the EOF, but those operations have been lost!
1034
 
                                 * We compensate for this by adding all blocks between
1035
 
                                 * to the free list.
1036
 
                                 */
1037
 
                                free_rec.rf_rec_type_1 = XT_TAB_STATUS_FREED;
1038
 
                                free_rec.rf_not_used_1 = 0;
1039
 
                                while (tab->tab_head_rec_eof_id < rec_id) {
1040
 
                                        XT_SET_DISK_4(free_rec.rf_next_rec_id_4, tab->tab_head_rec_free_id);
1041
 
                                        if (!xt_tab_write_rec(ot, tab->tab_head_rec_eof_id, sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec))
1042
 
                                                xt_throw(self);
1043
 
                                        tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
1044
 
                                        tab->tab_head_rec_free_id = tab->tab_head_rec_eof_id;
1045
 
                                        tab->tab_head_rec_eof_id++;
1046
 
                                }
1047
 
                        }
1048
 
                        if (tab->tab_head_rec_eof_id < rec_id + 1)
1049
 
                                tab->tab_head_rec_eof_id = rec_id + 1;
1050
 
                        tab->tab_flush_pending = TRUE;
1051
 
                        break;
1052
 
                case XT_LOG_ENT_UPDATE_FL:
1053
 
                case XT_LOG_ENT_INSERT_FL:
1054
 
                case XT_LOG_ENT_DELETE_FL:
1055
 
                case XT_LOG_ENT_UPDATE_FL_BG:
1056
 
                case XT_LOG_ENT_INSERT_FL_BG:
1057
 
                case XT_LOG_ENT_DELETE_FL_BG:
1058
 
                        rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
1059
 
                        len = (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
1060
 
                        free_ref_id = XT_GET_DISK_4(record->xf.xf_free_rec_id_4);
1061
 
 
1062
 
                        if (check_index &&
1063
 
                                record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL &&
1064
 
                                record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL_BG) {
1065
 
                                if ((rec_data = xres_load_record(self, ot, rec_id, &record->xf.xf_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {
1066
 
                                        row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
1067
 
                                        if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))
1068
 
                                                xt_throw(self);
1069
 
                                }
1070
 
                        }
1071
 
 
1072
 
                        if (!in_sequence) {
1073
 
                                /* This record was allocated from the free list.
1074
 
                                 * Because this operation is out of sequence, there
1075
 
                                 * could have been other allocations from the
1076
 
                                 * free list before this, that have gone missing.
1077
 
                                 * For this reason we have to search the current
1078
 
                                 * free list and remove the record.
1079
 
                                 */
1080
 
                                link_rec_id = tab->tab_head_rec_free_id;
1081
 
                                prev_link_rec_id = 0;
1082
 
                                while (link_rec_id) {
1083
 
                                        if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecFreeDRec), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, NULL, &self->st_statistics.st_rec, self))
1084
 
                                                xt_throw(self);
1085
 
                                        if (link_rec_id == rec_id)
1086
 
                                                break;
1087
 
                                        prev_link_rec_id = link_rec_id;
1088
 
                                        link_rec_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);
1089
 
                                }
1090
 
                                if (link_rec_id == rec_id) {
1091
 
                                        /* The block was found on the free list.
1092
 
                                         * remove it: */
1093
 
                                        if (prev_link_rec_id) {
1094
 
                                                /* We write the record from position 'link_rec_id' into
1095
 
                                                 * position 'prev_link_rec_id'. This unlinks 'link_rec_id'!
1096
 
                                                 */
1097
 
                                                if (!xt_tab_write_rec(ot, xt_rec_id_to_rec_offset(tab, prev_link_rec_id), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec))
1098
 
                                                        xt_throw(self);
1099
 
                                                tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
1100
 
                                                free_ref_id = tab->tab_head_rec_free_id;
1101
 
                                        }
1102
 
                                        else
1103
 
                                                /* The block is at the front of the list: */
1104
 
                                                free_ref_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);
1105
 
                                }
1106
 
                                else {
1107
 
                                        /* Not found on the free list? */
1108
 
                                        if (tab->tab_head_rec_eof_id < rec_id + 1)
1109
 
                                                tab->tab_head_rec_eof_id = rec_id + 1;
1110
 
                                        goto write_mod_data;
1111
 
                                }
1112
 
                        }
1113
 
                        if (tab->tab_head_rec_eof_id < rec_id + 1)
1114
 
                                tab->tab_head_rec_eof_id = rec_id + 1;
1115
 
                        tab->tab_head_rec_free_id = free_ref_id;
1116
 
                        tab->tab_head_rec_fnum--;
1117
 
                        write_mod_data:
1118
 
                        WRITE_REC_FILE(self, ot, rec_id, 0, len, (xtWord1 *) &record->xf.xf_rec_type_1, in_sequence, ws);
1119
 
                        tab->tab_bytes_to_flush += len;
1120
 
                        tab->tab_flush_pending = TRUE;
1121
 
                        break;
1122
 
                case XT_DEFUNKT_REC_REMOVED:
1123
 
                case XT_DEFUNKT_REC_REMOVED_EXT: {
1124
 
                        xtBool                  record_loaded;
1125
 
                        XTTabRecExtDPtr ext_rec;
1126
 
                        size_t                  red_size;
1127
 
                        xtWord4                 log_over_size = 0;
1128
 
                        xtLogID                 data_log_id = 0;
1129
 
                        xtLogOffset             data_log_offset = 0;
1130
 
                        u_int                   cols_required = 0;
1131
 
 
1132
 
                        rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
1133
 
                        free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;
1134
 
 
1135
 
                        /* This is a short-cut, it does not require loading the record: */
1136
 
                        if (!check_index && !tab->tab_dic.dic_blob_count && record->fr.fr_status_1 != XT_DEFUNKT_REC_REMOVED_EXT)
1137
 
                                goto do_rec_freed;
1138
 
 
1139
 
                        ext_rec = (XTTabRecExtDPtr) ot->ot_row_rbuffer;
1140
 
 
1141
 
                        if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, ext_rec, &red_size, &self->st_statistics.st_rec, self)) {
1142
 
                                xt_log_and_clear_exception_ns();
1143
 
                                goto do_rec_freed;
1144
 
                        }
1145
 
 
1146
 
                        if (red_size < sizeof(XTTabRecHeadDRec))
1147
 
                                goto do_rec_freed;
1148
 
 
1149
 
                        /* Check that the record is the same as the one originally removed.
1150
 
                         * This can be different if recovery is repeated.
1151
 
                         * For example:
1152
 
                         * 
1153
 
                         * log=21 offset=6304472 REMOVED-X REC op=360616 tab=7 rec=25874 
1154
 
                         * log=21 offset=6309230 UPDATE-FL op=360618 tab=7 rec=25874 row=26667 log=1 offset=26503077 xact=209 
1155
 
                         * log=21 offset=6317500 CLEAN REC op=360631 tab=7 rec=25874 
1156
 
                         * 
1157
 
                         * If this recovery sequence is repeated, then the REMOVED-X will free the
1158
 
                         * extended record belonging to the update that came afterwards!
1159
 
                         *
1160
 
                         * Additional situation to consider:
1161
 
                         *
1162
 
                         * - A record "x" is created, and index entries created.
1163
 
                         * - A checkpoint is made done.
1164
 
                         * - Record "x" is deleted due to UPDATE.
1165
 
                         * - The index entries are removed, but the index is not
1166
 
                         *   flushed.
1167
 
                         * - This deletion is written to disk by the writer.
1168
 
                         * So we have the situation that the remove is on disk,
1169
 
                         * but the index changes have not been made.
1170
 
                         *
1171
 
                         * In this case, skipping to "do_rec_freed" is incorrect.
1172
 
                         */
1173
 
                        if (record->fr.fr_stat_id_1 != ext_rec->tr_stat_id_1 ||
1174
 
                                XT_GET_DISK_4(record->fr.fr_xact_id_4) != XT_GET_DISK_4(ext_rec->tr_xact_id_4))
1175
 
                                goto dont_remove_x_record;
1176
 
 
1177
 
                        if (status == XT_DEFUNKT_REC_REMOVED_EXT) {
1178
 
                                if (!XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1))
1179
 
                                        goto dont_remove_x_record;
1180
 
                                if (red_size < offsetof(XTTabRecExtDRec, re_data))
1181
 
                                        goto dont_remove_x_record;
1182
 
 
1183
 
                                /* Save this for later (can be overwritten by xres_load_record(): */
1184
 
                                data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
1185
 
                                data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
1186
 
                                log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
1187
 
                        }
1188
 
                        dont_remove_x_record:
1189
 
 
1190
 
                        record_loaded = FALSE;
1191
 
 
1192
 
                        if (check_index) {
1193
 
                                cols_required = tab->tab_dic.dic_ind_cols_req;
1194
 
                                if (tab->tab_dic.dic_blob_cols_req > cols_required)
1195
 
                                        cols_required = tab->tab_dic.dic_blob_cols_req;
1196
 
                                if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))
1197
 
                                        goto do_rec_freed;
1198
 
                                record_loaded = TRUE;
1199
 
                                xres_remove_index_entries(ot, rec_id, rec_data);
1200
 
                        }
1201
 
 
1202
 
                        if (tab->tab_dic.dic_blob_count) {
1203
 
                                if (!record_loaded) {
1204
 
                                        if (tab->tab_dic.dic_blob_cols_req > cols_required)
1205
 
                                                cols_required = tab->tab_dic.dic_blob_cols_req;
1206
 
                                        if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))
1207
 
                                                /* [(7)] REMOVE is followed by FREE:
1208
 
                                                goto get_rec_offset;
1209
 
                                                */
1210
 
                                                goto do_rec_freed;
1211
 
                                        record_loaded = TRUE;
1212
 
                                }
1213
 
                        }
1214
 
 
1215
 
                        if (status == XT_DEFUNKT_REC_REMOVED_EXT) {
1216
 
                                /* Note: dlb_delete_log() may be repeated, but should handle this:
1217
 
                                 * 
1218
 
                                 * Example:
1219
 
                                 * log=5 offset=213334 CLEAN REC op=28175 tab=1 rec=317428 
1220
 
                                 * ...
1221
 
                                 * log=6 offset=321063 REMOVED-X REC op=33878 tab=1 rec=317428 
1222
 
                                 *
1223
 
                                 * When this sequence is repeated during recovery, then CLEAN REC
1224
 
                                 * will reset the status byte of the record so that it
1225
 
                                 * comes back to here!
1226
 
                                 *
1227
 
                                 * The check for zero is probably not required here.
1228
 
                                 */
1229
 
                                if (data_log_id && data_log_offset && log_over_size) {
1230
 
                                        if (ot->ot_table->tab_dic.dic_tab_flags & XT_TF_MEMORY_TABLE)
1231
 
                                                xt_tab_free_ext_slot(tab, data_log_id, data_log_offset, log_over_size);
1232
 
                                        else {
1233
 
                                                if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {
1234
 
                                                        if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
1235
 
                                                                ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
1236
 
                                                                xt_log_and_clear_exception_ns();
1237
 
                                                }
1238
 
                                        }
1239
 
                                }
1240
 
                        }
1241
 
 
1242
 
                        goto do_rec_freed;
1243
 
                }
1244
 
                case XT_LOG_ENT_REC_REMOVED_BI:
1245
 
                case XT_LOG_ENT_REC_REMOVED_BI_L: 
1246
 
                {
1247
 
                        /*
1248
 
                         * For deletion we need the complete before image because of the following problem.
1249
 
                         *
1250
 
                         * DROP TABLE IF EXISTS t1;
1251
 
                         * CREATE TABLE t1 (ID int primary key auto_increment, value int, index (value)) engine=pbxt;
1252
 
                         * 
1253
 
                         * insert t1(value) values(50);
1254
 
                         * 
1255
 
                         * -- CHECKPOINT --
1256
 
                         * 
1257
 
                         * update t1 set value = 60;
1258
 
                         * 
1259
 
                         * -- PAUSE --
1260
 
                         * 
1261
 
                         * update t1 set value = 70;
1262
 
                         * 
1263
 
                         * -- CRASH --
1264
 
                         * 
1265
 
                         * select value from t1;
1266
 
                         * select * from t1;
1267
 
                         * 
1268
 
                         * 081203 12:11:46 [Note] PBXT: Recovering from 1-148, bytes to read: 33554284
1269
 
                         * log=1 offset=148 UPDATE-BG op=5 tab=1 rec=2 row=1 xact=3 
1270
 
                         * log=1 offset=188 REC ADD ROW op=6 tab=1 row=1 
1271
 
                         * log=1 offset=206 COMMIT xact=3 
1272
 
                         * log=1 offset=216 REMOVED REC op=7 tab=1 rec=1 xact=2 
1273
 
                         * log=1 offset=241 CLEAN REC op=8 tab=1 rec=2 
1274
 
                         * log=1 offset=261 CLEANUP xact=3 
1275
 
                         * log=1 offset=267 UPDATE-FL-BG op=9 tab=1 rec=1 row=1 xact=4 
1276
 
                         * log=1 offset=311 REC ADD ROW op=10 tab=1 row=1 
1277
 
                         * log=1 offset=329 COMMIT xact=4 
1278
 
                         * log=1 offset=339 REMOVED REC op=11 tab=1 rec=2 xact=3 
1279
 
                         * log=1 offset=364 CLEAN REC op=12 tab=1 rec=1 
1280
 
                         * log=1 offset=384 CLEANUP xact=4 
1281
 
                         * 081203 12:12:15 [Note] PBXT: Recovering complete at 1-390, bytes read: 33554284
1282
 
                         * 
1283
 
                         * mysql> select value from t1;
1284
 
                         * +-------+
1285
 
                         * | value |
1286
 
                         * +-------+
1287
 
                         * |    50 | 
1288
 
                         * |    70 | 
1289
 
                         * +-------+
1290
 
                         * 2 rows in set (55.99 sec)
1291
 
                         * 
1292
 
                         * mysql> select * from t1;
1293
 
                         * +----+-------+
1294
 
                         * | ID | value |
1295
 
                         * +----+-------+
1296
 
                         * |  1 |    70 | 
1297
 
                         * +----+-------+
1298
 
                         * 1 row in set (0.00 sec)
1299
 
                         */
1300
 
                        XTTabRecExtDPtr ext_rec;
1301
 
                        xtWord4                 log_over_size = 0;
1302
 
                        xtLogID                 data_log_id = 0;
1303
 
                        xtLogOffset             data_log_offset = 0;
1304
 
                        u_int                   cols_required = 0;
1305
 
                        xtBool                  record_loaded;
1306
 
                        size_t                  rec_size;               
1307
 
 
1308
 
                        rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
1309
 
                        rec_size = XT_GET_DISK_2(record->rb.rb_size_2);
1310
 
 
1311
 
                        if (status == XT_LOG_ENT_REC_REMOVED_BI_L)
1312
 
                                ext_rec = (XTTabRecExtDPtr) &record->bl.bl_rec_type_1;
1313
 
                        else
1314
 
                                ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
1315
 
 
1316
 
                        if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
1317
 
                                /* Save this for later (can be overwritten by xres_load_record(): */
1318
 
                                data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
1319
 
                                data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
1320
 
                                log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
1321
 
                        }
1322
 
 
1323
 
                        record_loaded = FALSE;
1324
 
 
1325
 
                        if (check_index) {
1326
 
                                cols_required = tab->tab_dic.dic_ind_cols_req;
1327
 
                                if (!(rec_data = xres_load_record(self, ot, rec_id, (xtWord1 *) ext_rec, rec_size, rec_buf, cols_required)))
1328
 
                                        goto go_on_to_free;
1329
 
                                record_loaded = TRUE;
1330
 
                                xres_remove_index_entries(ot, rec_id, rec_data);
1331
 
                        }
1332
 
 
1333
 
                        if (data_log_id && data_log_offset && log_over_size) {
1334
 
                                if (ot->ot_table->tab_dic.dic_tab_flags & XT_TF_MEMORY_TABLE)
1335
 
                                        xt_tab_free_ext_slot(tab, data_log_id, data_log_offset, log_over_size);
1336
 
                                else {
1337
 
                                        if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {
1338
 
                                                if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
1339
 
                                                        ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
1340
 
                                                        xt_log_and_clear_exception_ns();
1341
 
                                        }
1342
 
                                }
1343
 
                        }
1344
 
 
1345
 
                        go_on_to_free:
1346
 
                        /* Use the new record type: */
1347
 
                        ext_rec->tr_rec_type_1 = record->rb.rb_new_rec_type_1;
1348
 
                        free_data = (XTTabRecFreeDPtr) ext_rec;
1349
 
 
1350
 
                        if (status == XT_LOG_ENT_REC_REMOVED_BI)
1351
 
                                goto do_rec_freed;
1352
 
 
1353
 
                        if (!in_sequence)
1354
 
                                goto do_rec_freed;
1355
 
 
1356
 
                        XTTabRecFreeDRec        prev_free_rec;
1357
 
                        xtRecordID                      prev_rec_id;
1358
 
 
1359
 
                        prev_rec_id = XT_GET_DISK_4(record->bl.bl_prev_rec_id_4);
1360
 
                        XT_SET_DISK_4(prev_free_rec.rf_next_rec_id_4, rec_id);
1361
 
                        WRITE_REC_FILE(self, ot, prev_rec_id, offsetof(XTTabRecFreeDRec, rf_next_rec_id_4), 4, prev_free_rec.rf_next_rec_id_4, in_sequence, ws);
1362
 
                        tab->tab_bytes_to_flush += 4;
1363
 
                        tab->tab_flush_pending = TRUE;
1364
 
                        
1365
 
                        tab->tab_head_rec_fnum++;
1366
 
                        WRITE_REC_FILE(self, ot, rec_id, 0, sizeof(XTTabRecFreeDRec), (xtWord1 *) free_data, in_sequence, ws);
1367
 
                        tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
1368
 
                        tab->tab_flush_pending = TRUE;
1369
 
                        break;
1370
 
                }
1371
 
                case XT_DEFUNKT_REC_FREED:
1372
 
                        rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
1373
 
                        free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;
1374
 
                        do_rec_freed:
1375
 
                        if (!in_sequence) {
1376
 
                                size_t  red_size;
1377
 
 
1378
 
                                /* Free the record.
1379
 
                                 * We place the record on front of the current
1380
 
                                 * free list.
1381
 
                                 *
1382
 
                                 * However, before we do this, we remove the record
1383
 
                                 * from its row list, if the record is on a row list.
1384
 
                                 *
1385
 
                                 * We do this here, because in the normal removal
1386
 
                                 * from the row list uses the operations:
1387
 
                                 *
1388
 
                                 * XT_LOG_ENT_REC_UNLINKED, XT_LOG_ENT_ROW_SET and
1389
 
                                 * XT_LOG_ENT_ROW_FREED.
1390
 
                                 *
1391
 
                                 * When operations are performed out of sequence,
1392
 
                                 * these operations are ignored for the purpose
1393
 
                                 * of removing the record from the row.
1394
 
                                 */
1395
 
                                if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head, NULL, &self->st_statistics.st_rec, self))
1396
 
                                        xt_throw(self);
1397
 
                                /* The record is already free: */
1398
 
                                if (XT_REC_IS_FREE(rec_head.tr_rec_type_1))
1399
 
                                        goto free_done;
1400
 
                                row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
1401
 
 
1402
 
                                /* Search the row for this record: */
1403
 
                                if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, NULL, &self->st_statistics.st_rec, self))
1404
 
                                        xt_throw(self);
1405
 
                                link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1406
 
                                prev_link_rec_id = 0;
1407
 
                                while (link_rec_id) {
1408
 
                                        if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &red_size, &self->st_statistics.st_rec, self)) {
1409
 
                                                xt_log_and_clear_exception(self);
1410
 
                                                break;
1411
 
                                        }
1412
 
                                        if (red_size < sizeof(XTTabRecHeadDRec))
1413
 
                                                break;
1414
 
                                        if (link_rec_id == rec_id)
1415
 
                                                break;
1416
 
                                        if (XT_GET_DISK_4(rec_head.tr_row_id_4) != row_id)
1417
 
                                                break;
1418
 
                                        switch (rec_head.tr_rec_type_1 & XT_TAB_STATUS_MASK) {
1419
 
                                                case XT_TAB_STATUS_FREED:
1420
 
                                                        break;
1421
 
                                                case XT_TAB_STATUS_DELETE:
1422
 
                                                case XT_TAB_STATUS_FIXED:
1423
 
                                                case XT_TAB_STATUS_VARIABLE:
1424
 
                                                case XT_TAB_STATUS_EXT_DLOG:
1425
 
                                                        break;
1426
 
                                                default:
1427
 
                                                        ASSERT(FALSE);
1428
 
                                                        goto exit_loop;
1429
 
                                        }
1430
 
                                        if (rec_head.tr_rec_type_1 & ~(XT_TAB_STATUS_CLEANED_BIT | XT_TAB_STATUS_MASK)) {
1431
 
                                                ASSERT(FALSE);
1432
 
                                                break;
1433
 
                                        }
1434
 
                                        prev_link_rec_id = link_rec_id;
1435
 
                                        link_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
1436
 
                                }
1437
 
 
1438
 
                                exit_loop:
1439
 
                                if (link_rec_id == rec_id) {
1440
 
                                        /* The record was found on the row list, remove it: */
1441
 
                                        if (prev_link_rec_id) {
1442
 
                                                /* We write the previous variation pointer from position 'link_rec_id' into
1443
 
                                                 * variation pointer of the 'prev_link_rec_id' record. This unlinks 'link_rec_id'!
1444
 
                                                 */
1445
 
                                                if (!xt_tab_write_rec(ot, xt_rec_id_to_rec_offset(tab, prev_link_rec_id) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4), XT_RECORD_ID_SIZE, (xtWord1 *) &rec_head.tr_prev_rec_id_4))
1446
 
                                                        xt_throw(self);
1447
 
                                                tab->tab_bytes_to_flush += XT_RECORD_ID_SIZE;
1448
 
                                        }
1449
 
                                        else {
1450
 
                                                /* The record is at the front of the row list: */
1451
 
                                                xtRefID ref_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
1452
 
                                                XT_SET_DISK_4(row_buf.rr_ref_id_4, ref_id);
1453
 
                                                if (!xt_tab_write_row(ot, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf))
1454
 
                                                        xt_throw(self);
1455
 
                                                tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1456
 
                                        }
1457
 
                                }                               
1458
 
 
1459
 
                                /* Now we free the record, by placing it at the front of
1460
 
                                 * the free list:
1461
 
                                 */
1462
 
                                XT_SET_DISK_4(free_data->rf_next_rec_id_4, tab->tab_head_rec_free_id);                          
1463
 
                        }
1464
 
                        tab->tab_head_rec_free_id = rec_id;
1465
 
                        tab->tab_head_rec_fnum++;
1466
 
                        WRITE_REC_FILE(self, ot, rec_id, 0, sizeof(XTTabRecFreeDRec), (xtWord1 *) free_data, in_sequence, ws);
1467
 
                        tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
1468
 
                        tab->tab_flush_pending = TRUE;
1469
 
                        free_done:
1470
 
                        break;
1471
 
                case XT_LOG_ENT_REC_MOVED:
1472
 
                        len = 8;
1473
 
                        rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
1474
 
                        WRITE_REC_FILE(self, ot, rec_id, offsetof(XTTabRecExtDRec, re_log_id_2), len, (xtWord1 *) &record->xw.xw_rec_type_1, in_sequence, ws);
1475
 
                        tab->tab_bytes_to_flush += len;
1476
 
                        tab->tab_flush_pending = TRUE;
1477
 
                        break;
1478
 
                case XT_LOG_ENT_REC_CLEANED:
1479
 
                        len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1480
 
                        goto get_rec_offset;
1481
 
                case XT_LOG_ENT_REC_CLEANED_1:
1482
 
                        len = 1;
1483
 
                        goto get_rec_offset;
1484
 
                case XT_LOG_ENT_REC_UNLINKED:
1485
 
                        if (!in_sequence) {
1486
 
                                /* Unlink the record.
1487
 
                                 * This is done when the record is freed.
1488
 
                                 */
1489
 
                                break;
1490
 
                        }
1491
 
                        len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1492
 
                        get_rec_offset:
1493
 
                        rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
1494
 
                        WRITE_REC_FILE(self, ot, rec_id, 0, len, (xtWord1 *) &record->xw.xw_rec_type_1, in_sequence, ws);
1495
 
                        tab->tab_bytes_to_flush += len;
1496
 
                        tab->tab_flush_pending = TRUE;
1497
 
                        break;
1498
 
                case XT_LOG_ENT_ROW_NEW:
1499
 
                        len = offsetof(XTactRowAddedEntryDRec, xa_free_list_4);
1500
 
                        row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
1501
 
                        if (!in_sequence) {
1502
 
                                /* A row was allocated from the EOF. Because operations are missing.
1503
 
                                 * The blocks between the current EOF and the new EOF need to be
1504
 
                                 * place on the free list!
1505
 
                                 */                             
1506
 
                                while (tab->tab_head_row_eof_id < row_id) {
1507
 
                                        XT_SET_DISK_4(row_buf.rr_ref_id_4, tab->tab_head_row_free_id);
1508
 
                                        if (!xt_tab_write_row(ot, xt_row_id_to_row_offset(tab, tab->tab_head_row_eof_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf))
1509
 
                                                xt_throw(self);
1510
 
                                        tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1511
 
                                        tab->tab_head_row_free_id = tab->tab_head_row_eof_id;
1512
 
                                        tab->tab_head_row_eof_id++;
1513
 
                                }
1514
 
                        }
1515
 
                        if (tab->tab_head_row_eof_id < row_id + 1)
1516
 
                                tab->tab_head_row_eof_id = row_id + 1;
1517
 
                        tab->tab_flush_pending = TRUE;
1518
 
                        break;
1519
 
                case XT_LOG_ENT_ROW_NEW_FL:
1520
 
                        len = sizeof(XTactRowAddedEntryDRec);
1521
 
                        row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
1522
 
                        free_ref_id = XT_GET_DISK_4(record->xa.xa_free_list_4);
1523
 
                        if (!in_sequence) {
1524
 
                                size_t red_size;
1525
 
                                /* The record was taken from the free list.
1526
 
                                 * If the operations were in sequence, then this would be
1527
 
                                 * the front of the free list now.
1528
 
                                 * However, because operations are missing, it may no
1529
 
                                 * longer be the front of the free list!
1530
 
                                 * Search and remove:
1531
 
                                 */
1532
 
                                link_rec_id = tab->tab_head_row_free_id;
1533
 
                                prev_link_rec_id = 0;
1534
 
                                while (link_rec_id) {
1535
 
                                        if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, link_rec_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &red_size, &self->st_statistics.st_rec, self)) {
1536
 
                                                xt_log_and_clear_exception(self);
1537
 
                                                break;
1538
 
                                        }
1539
 
                                        if (red_size < sizeof(XTTabRowRefDRec))
1540
 
                                                break;
1541
 
                                        if (link_rec_id == row_id)
1542
 
                                                break;
1543
 
                                        prev_link_rec_id = link_rec_id;
1544
 
                                        link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1545
 
                                }
1546
 
                                if (link_rec_id == row_id) {
1547
 
                                        /* The block was found on the free list, remove it: */
1548
 
                                        if (prev_link_rec_id) {
1549
 
                                                /* We write the record from position 'link_rec_id' into
1550
 
                                                 * position 'prev_link_rec_id'. This unlinks 'link_rec_id'!
1551
 
                                                 */
1552
 
                                                if (!xt_tab_write_row(ot, xt_row_id_to_row_offset(tab, prev_link_rec_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf))
1553
 
                                                        xt_throw(self);
1554
 
                                                tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1555
 
                                                free_ref_id = tab->tab_head_row_free_id;
1556
 
                                        }
1557
 
                                        else
1558
 
                                                /* The block is at the front of the free list: */
1559
 
                                                free_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1560
 
                                }
1561
 
                                else {
1562
 
                                        /* Not found? */
1563
 
                                        if (tab->tab_head_row_eof_id < row_id + 1)
1564
 
                                                tab->tab_head_row_eof_id = row_id + 1;
1565
 
                                        break;
1566
 
                                }
1567
 
                                        
1568
 
                        }
1569
 
                        if (tab->tab_head_row_eof_id < row_id + 1)
1570
 
                                tab->tab_head_row_eof_id = row_id + 1;
1571
 
                        tab->tab_head_row_free_id = free_ref_id;
1572
 
                        tab->tab_head_row_fnum--;
1573
 
                        tab->tab_flush_pending = TRUE;
1574
 
                        break;
1575
 
                case XT_LOG_ENT_ROW_FREED:
1576
 
                        row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
1577
 
                        if (!in_sequence) {
1578
 
                                /* Free the row.
1579
 
                                 * Since this operation is being performed out of sequence, we
1580
 
                                 * must assume that some other free and allocation operations
1581
 
                                 * must be missing.
1582
 
                                 * For this reason, we add the row to the front of the
1583
 
                                 * existing free list.
1584
 
                                 */
1585
 
                                XT_SET_DISK_4(record->wr.wr_ref_id_4, tab->tab_head_row_free_id);
1586
 
                        }
1587
 
                        tab->tab_head_row_free_id = row_id;
1588
 
                        tab->tab_head_row_fnum++;
1589
 
                        goto write_row_data;
1590
 
                case XT_LOG_ENT_ROW_ADD_REC:
1591
 
                        row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
1592
 
                        if (!in_sequence) {
1593
 
                                if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &tfer, &self->st_statistics.st_rec, self))
1594
 
                                        xt_throw(self);
1595
 
                                if (tfer == sizeof(XTTabRowRefDRec)) {
1596
 
                                        /* Add a record to the front of the row.
1597
 
                                         * This is easy, but we have to make sure that the next
1598
 
                                         * pointer in the record is correct.
1599
 
                                         */
1600
 
                                        rec_id = XT_GET_DISK_4(record->wr.wr_ref_id_4);
1601
 
                                        if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &tfer, &self->st_statistics.st_rec, self))
1602
 
                                                xt_throw(self);
1603
 
                                        if (tfer == sizeof(XTTabRecHeadDRec) && XT_GET_DISK_4(rec_head.tr_row_id_4) == row_id) {
1604
 
                                                /* This is now the correct next pointer: */
1605
 
                                                xtRecordID next_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1606
 
                                                if (XT_GET_DISK_4(rec_head.tr_prev_rec_id_4) != next_ref_id &&
1607
 
                                                        rec_id != next_ref_id) {
1608
 
                                                        XT_SET_DISK_4(rec_head.tr_prev_rec_id_4, next_ref_id);
1609
 
                                                        if (!xt_tab_write_rec(ot, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
1610
 
                                                                xt_throw(self);
1611
 
                                                        tab->tab_bytes_to_flush += sizeof(XTTabRecHeadDRec);
1612
 
                                                }
1613
 
                                        }
1614
 
                                }
1615
 
                        }
1616
 
                        goto write_row_data;
1617
 
                case XT_LOG_ENT_ROW_SET:
1618
 
                        if (!in_sequence)
1619
 
                                /* This operation is ignored when out of sequence!
1620
 
                                 * The operation is used to remove a record from a row.
1621
 
                                 * This is done automatically when the record is freed.
1622
 
                                 */
1623
 
                                break;
1624
 
                        row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
1625
 
                        write_row_data:
1626
 
                        ASSERT_NS(XT_GET_DISK_4(record->wr.wr_ref_id_4) < tab->tab_head_rec_eof_id);
1627
 
                        if (!xt_tab_write_row(ot, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &record->wr.wr_ref_id_4))
1628
 
                                xt_throw(self);
1629
 
                        tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1630
 
                        if (tab->tab_head_row_eof_id < row_id + 1)
1631
 
                                tab->tab_head_row_eof_id = row_id + 1;
1632
 
                        tab->tab_flush_pending = TRUE;
1633
 
                        break;
1634
 
                case XT_LOG_ENT_NO_OP:
1635
 
                case XT_LOG_ENT_END_OF_LOG:
1636
 
                        break;
1637
 
        }
1638
 
 
1639
 
        /* Maybe flush? */
1640
 
#ifdef XT_REC_FLUSH_THRESHOLD
1641
 
        if (self->st_statistics.st_rec.ts_write - tab->tab_rec_wr_last_flush > XT_REC_FLUSH_THRESHOLD) {
1642
 
                tab->tab_rec_wr_last_flush = self->st_statistics.st_rec.ts_write;
1643
 
                
1644
 
                if (!xt_async_flush_record_row(tab, FALSE, self))
1645
 
                        xt_throw(self);
1646
 
        }
1647
 
#endif
1648
 
 
1649
 
#ifdef XT_SORT_REC_WRITES
1650
 
        if (in_sequence && xt_sl_get_size(tab->tab_rec_dw_writes))
1651
 
                tab->tab_rec_dw_op_seq = op_seq;
1652
 
        else
1653
 
                tab->tab_head_op_seq = op_seq;
1654
 
#else
1655
 
        tab->tab_head_op_seq = op_seq;
1656
 
#endif
1657
 
        tab->tab_wr_op_seq = op_seq;
1658
 
}
1659
 
 
1660
 
/*
1661
 
 * Apply all operations that have been buffered
1662
 
 * for a particular table.
1663
 
 * Operations are buffered if they are
1664
 
 * read from the log out of sequence.
1665
 
 *
1666
 
 * In this case we buffer, and wait for the
1667
 
 * out of sequence operations to arrive.
1668
 
 *
1669
 
 * When the server is running, this will always be
1670
 
 * the case. A delay occurs while a transaction 
1671
 
 * fills its private log buffer.
1672
 
 */
1673
 
static void xres_apply_operations(XTThreadPtr self, XTWriterStatePtr ws, xtBool in_sequence)
1674
 
{
1675
 
        XTTableHPtr             tab = ws->ws_ot->ot_table;
1676
 
        u_int                   i = 0;
1677
 
        XTOperationPtr  op;
1678
 
        xtBool                  check_index;
1679
 
 
1680
 
// XTDatabaseHPtr db, XTOpenTablePtr ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf
1681
 
        xt_sl_lock(self, tab->tab_op_list);
1682
 
        for (;;) {
1683
 
                op = (XTOperationPtr) xt_sl_item_at(tab->tab_op_list, i);
1684
 
                if (!op)
1685
 
                        break;
1686
 
                if (in_sequence && tab->tab_wr_op_seq+1 != op->or_op_seq)
1687
 
                        break;
1688
 
                xt_db_set_size(self, &ws->ws_databuf, (size_t) op->or_op_len);
1689
 
                if (!ws->ws_db->db_xlog.xlog_rnd_read(&ws->ws_seqread, op->or_log_id, op->or_log_offset, (size_t) op->or_op_len, ws->ws_databuf.db_data, NULL, self))
1690
 
                        xt_throw(self);
1691
 
                check_index = ws->ws_in_recover && xt_comp_log_pos(op->or_log_id, op->or_log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;
1692
 
                xres_apply_change(self, ws, (XTXactLogBufferDPtr) ws->ws_databuf.db_data, in_sequence, check_index, op->or_op_seq);
1693
 
                if (tab->tab_wr_wake_freeer) {
1694
 
                        if (!XTTableSeq::xt_op_is_before(tab->tab_wr_op_seq, tab->tab_wake_freeer_op))
1695
 
                                xt_wr_wake_freeer(self, ws->ws_db);
1696
 
                }
1697
 
                i++;
1698
 
        }
1699
 
        xt_sl_remove_from_front(self, tab->tab_op_list, i);
1700
 
        xt_sl_unlock(self, tab->tab_op_list);
1701
 
}
1702
 
 
1703
 
/* Check for operations still remaining on tables.
1704
 
 * These operations are applied even though operations
1705
 
 * in sequence are missing.
1706
 
 */
1707
 
static xtBool xres_sync_operations(XTThreadPtr self, XTDatabaseHPtr db, XTWriterStatePtr ws)
1708
 
{
1709
 
        u_int                   edx;
1710
 
        XTTableEntryPtr te_ptr;
1711
 
        XTTableHPtr             tab;
1712
 
        xtBool                  op_synced = FALSE;
1713
 
 
1714
 
        xt_enum_tables_init(&edx);
1715
 
        while ((te_ptr = xt_enum_tables_next(self, db, &edx))) {
1716
 
                /* Dirty read of tab_op_list OK, here because this is the
1717
 
                 * only thread that updates the list!
1718
 
                 */
1719
 
                if ((tab = te_ptr->te_table)) {
1720
 
                        if (xt_sl_get_size(tab->tab_op_list)) {
1721
 
                                op_synced = TRUE;
1722
 
                                if (xres_open_table(self, ws, te_ptr->te_tab_id))
1723
 
                                        xres_apply_operations(self, ws, FALSE);
1724
 
                        }
1725
 
 
1726
 
                        /* Update the pointer cache: */
1727
 
                        tab->tab_seq.xt_op_seq_set(self, tab->tab_wr_op_seq+1);
1728
 
                        tab->tab_row_eof_id = tab->tab_head_row_eof_id;
1729
 
                        tab->tab_row_free_id = tab->tab_head_row_free_id;
1730
 
                        tab->tab_row_fnum = tab->tab_head_row_fnum;
1731
 
                        tab->tab_rec_eof_id = tab->tab_head_rec_eof_id;
1732
 
                        tab->tab_rec_free_id = tab->tab_head_rec_free_id;
1733
 
                        tab->tab_rec_fnum = tab->tab_head_rec_fnum;
1734
 
                }
1735
 
        }
1736
 
        return op_synced;
1737
 
}
1738
 
 
1739
 
#ifdef XT_CORRECT_TABLE_FREE_COUNT
1740
 
#define CORRECT_COUNT           TRUE
1741
 
#else
1742
 
#define CORRECT_COUNT           FALSE
1743
 
#endif
1744
 
#ifdef XT_CHECK_RECORD_FREE_COUNT
1745
 
#define CHECK_RECS                      TRUE
1746
 
#else
1747
 
#define CHECK_RECS                      FALSE
1748
 
#endif
1749
 
#if defined(XT_CHECK_RECORD_FREE_COUNT) || defined(XT_CHECK_ROW_FREE_COUNT)
1750
 
#define RECOVER_FREE_COUNTS
1751
 
#endif
1752
 
 
1753
 
#ifdef RECOVER_FREE_COUNTS
1754
 
/* {CORRECTED-ROW-COUNT}
1755
 
 * This error can be repeated by crashing the server during
1756
 
 * high activitity, after flush table writes the table header
1757
 
 * 
1758
 
 * On recovery, the free count "from the future" is used as
1759
 
 * the starting point for subsequent allocation and frees.
1760
 
 * The count is wrong after that point.
1761
 
 *
1762
 
 * The recovery of the count only works correctly if a
1763
 
 * checkpoint is complete successfully after that table
1764
 
 * header is flushed. Basically the writing of the table
1765
 
 * header should be synchronsized with the writing of the
1766
 
 * end of the checkpoint.
1767
 
 *
1768
 
 * Another solution would be to log the count, along with
1769
 
 * the allocate and free commannds.
1770
 
 *
1771
 
 * The 3rd solution is the one used here. The count is corrected
1772
 
 * after recovery.
1773
 
 */
1774
 
static void xres_recover_table_free_counts(XTThreadPtr self, XTDatabaseHPtr db, XTWriterStatePtr ws)
1775
 
{
1776
 
        u_int                   edx;
1777
 
        XTTableEntryPtr te_ptr;
1778
 
        XTTableHPtr             tab;
1779
 
 
1780
 
        xt_enum_tables_init(&edx);
1781
 
        while ((te_ptr = xt_enum_tables_next(self, db, &edx))) {
1782
 
                if ((tab = te_ptr->te_table)) {
1783
 
                        if (xres_open_table(self, ws, te_ptr->te_tab_id))
1784
 
                                xt_tab_check_free_lists(self, ws->ws_ot, CHECK_RECS, CORRECT_COUNT);
1785
 
                }
1786
 
        }
1787
 
}
1788
 
#endif
1789
 
 
1790
 
/*
1791
 
 * Operations from the log are applied in sequence order.
1792
 
 * If the operations are out of sequence, they are buffered
1793
 
 * until the missing operations appear.
1794
 
 *
1795
 
 * NOTE: No lock is required because there should only be
1796
 
 * one thread that does this!
1797
 
 */
1798
 
xtPublic void xt_xres_apply_in_order(XTThreadPtr self, XTWriterStatePtr ws, xtLogID log_id, xtLogOffset log_offset, XTXactLogBufferDPtr record)
1799
 
{
1800
 
        xtOpSeqNo               op_seq;
1801
 
        xtTableID               tab_id;
1802
 
        size_t                  len;
1803
 
        xtBool                  check_index;
1804
 
 
1805
 
// XTDatabaseHPtr db, XTOpenTablePtr *ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf
1806
 
        switch (record->xl.xl_status_1) {
1807
 
                case XT_LOG_ENT_REC_MODIFIED:
1808
 
                case XT_LOG_ENT_UPDATE:
1809
 
                case XT_LOG_ENT_INSERT:
1810
 
                case XT_LOG_ENT_DELETE:
1811
 
                case XT_LOG_ENT_UPDATE_BG:
1812
 
                case XT_LOG_ENT_INSERT_BG:
1813
 
                case XT_LOG_ENT_DELETE_BG:
1814
 
                        len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1) + (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
1815
 
                        op_seq = XT_GET_DISK_4(record->xu.xu_op_seq_4);
1816
 
                        tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
1817
 
                        break;
1818
 
                case XT_LOG_ENT_UPDATE_FL:
1819
 
                case XT_LOG_ENT_INSERT_FL:
1820
 
                case XT_LOG_ENT_DELETE_FL:
1821
 
                case XT_LOG_ENT_UPDATE_FL_BG:
1822
 
                case XT_LOG_ENT_INSERT_FL_BG:
1823
 
                case XT_LOG_ENT_DELETE_FL_BG:
1824
 
                        len = offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1) + (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
1825
 
                        op_seq = XT_GET_DISK_4(record->xf.xf_op_seq_4);
1826
 
                        tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
1827
 
                        break;
1828
 
                case XT_DEFUNKT_REC_FREED:
1829
 
                case XT_DEFUNKT_REC_REMOVED:
1830
 
                case XT_DEFUNKT_REC_REMOVED_EXT:
1831
 
                        /* [(7)] REMOVE is now a extended version of FREE! */
1832
 
                        len = offsetof(XTactFreeRecEntryDRec, fr_rec_type_1) + sizeof(XTTabRecFreeDRec);
1833
 
                        goto fixed_len_data;
1834
 
                case XT_LOG_ENT_REC_REMOVED_BI:
1835
 
                        len = offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1) + (size_t) XT_GET_DISK_2(record->rb.rb_size_2);
1836
 
                        op_seq = XT_GET_DISK_4(record->rb.rb_op_seq_4);
1837
 
                        tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
1838
 
                        break;
1839
 
                case XT_LOG_ENT_REC_REMOVED_BI_L:
1840
 
                        len = offsetof(XTactRemoveBILEntryDRec, bl_rec_type_1) + (size_t) XT_GET_DISK_2(record->bl.bl_size_2);
1841
 
                        op_seq = XT_GET_DISK_4(record->bl.bl_op_seq_4);
1842
 
                        tab_id = XT_GET_DISK_4(record->bl.bl_tab_id_4);
1843
 
                        break;
1844
 
                case XT_LOG_ENT_REC_MOVED:
1845
 
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 8;
1846
 
                        goto fixed_len_data;
1847
 
                case XT_LOG_ENT_REC_CLEANED:
1848
 
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1849
 
                        goto fixed_len_data;
1850
 
                case XT_LOG_ENT_REC_CLEANED_1:
1851
 
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 1;
1852
 
                        goto fixed_len_data;
1853
 
                case XT_LOG_ENT_REC_UNLINKED:
1854
 
                        len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1855
 
                        fixed_len_data:
1856
 
                        op_seq = XT_GET_DISK_4(record->xw.xw_op_seq_4);
1857
 
                        tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
1858
 
                        break;
1859
 
                case XT_LOG_ENT_ROW_NEW:
1860
 
                        len = sizeof(XTactRowAddedEntryDRec) - 4;
1861
 
                        goto new_row;
1862
 
                case XT_LOG_ENT_ROW_NEW_FL:
1863
 
                        len = sizeof(XTactRowAddedEntryDRec);
1864
 
                        new_row:
1865
 
                        op_seq = XT_GET_DISK_4(record->xa.xa_op_seq_4);
1866
 
                        tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
1867
 
                        break;
1868
 
                case XT_LOG_ENT_ROW_ADD_REC:
1869
 
                case XT_LOG_ENT_ROW_SET:
1870
 
                case XT_LOG_ENT_ROW_FREED:
1871
 
                        len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4) + sizeof(XTTabRowRefDRec);
1872
 
                        op_seq = XT_GET_DISK_4(record->wr.wr_op_seq_4);
1873
 
                        tab_id = XT_GET_DISK_4(record->wr.wr_tab_id_4);
1874
 
                        break;
1875
 
                case XT_LOG_ENT_NO_OP:
1876
 
                case XT_LOG_ENT_END_OF_LOG:
1877
 
                        return;
1878
 
                default:
1879
 
                        return;
1880
 
        }
1881
 
 
1882
 
        if (!xres_open_table(self, ws, tab_id))
1883
 
                return;
1884
 
 
1885
 
        XTTableHPtr tab = ws->ws_ot->ot_table;
1886
 
 
1887
 
        /* NOTE:
1888
 
         *
1889
 
         * During normal operation this is actually given.
1890
 
         *
1891
 
         * During recovery, it only applies to the record/row files
1892
 
         * The index file is flushed indepently, and changes may
1893
 
         * have been applied to the index (due to a call to flush index,
1894
 
         * which comes as a result of out of memory) that have not been
1895
 
         * applied to the record/row files.
1896
 
         *
1897
 
         * As a result we need to do the index checks that apply to this
1898
 
         * change.
1899
 
         *
1900
 
         * At the moment, I will just do everything, which should not
1901
 
         * hurt!
1902
 
         *
1903
 
         * This error can be repeated by running the test
1904
 
         * runTest(OUT_OF_CACHE_UPDATE_TEST, 32, OUT_OF_CACHE_UPDATE_TEST_UPDATE_COUNT, OUT_OF_CACHE_UPDATE_TEST_SET_SIZE)
1905
 
         * and crashing after a while.
1906
 
         *
1907
 
         * Do this by setting not_this to NULL. This will cause the test to
1908
 
         * hang after a while. After a restart the indexes are corrupt if the
1909
 
         * ws->ws_in_recover condition is not present here. 
1910
 
         */
1911
 
        if (ws->ws_in_recover) {
1912
 
                if (!tab->tab_op_seq_set) {
1913
 
                        /* op_seq <= tab_wr_op_seq + 1: */
1914
 
                        ASSERT(XTTableSeq::xt_op_is_before(op_seq, tab->tab_wr_op_seq+2));
1915
 
                        if (XTTableSeq::xt_op_is_before(op_seq-1, tab->tab_wr_op_seq))
1916
 
                                /* Adjust the operation sequence number: */
1917
 
                                tab->tab_wr_op_seq = op_seq-1;
1918
 
                        tab->tab_head_op_seq = tab->tab_wr_op_seq;
1919
 
                        tab->tab_op_seq_set = TRUE;
1920
 
                }
1921
 
        }
1922
 
 
1923
 
        if (!XTTableSeq::xt_op_is_before(tab->tab_wr_op_seq, op_seq))
1924
 
                return;
1925
 
 
1926
 
        if (tab->tab_wr_op_seq+1 == op_seq) {
1927
 
                /* I could use tab_ind_rec_log_id, but this may be a problem, if
1928
 
                 * recovery does not recover up to the last committed transaction.
1929
 
                 */ 
1930
 
                check_index = ws->ws_in_recover && xt_comp_log_pos(log_id, log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;
1931
 
                xres_apply_change(self, ws, record, TRUE, check_index, op_seq);
1932
 
                if (tab->tab_wr_wake_freeer) {
1933
 
                        if (!XTTableSeq::xt_op_is_before(tab->tab_wr_op_seq, tab->tab_wake_freeer_op))
1934
 
                                xt_wr_wake_freeer(self, ws->ws_db);
1935
 
                }
1936
 
 
1937
 
                /* Apply any operations in the list that now follow on...
1938
 
                 * NOTE: the tab_op_list only has be locked for modification.
1939
 
                 * This is because only one thread ever changes the list
1940
 
                 * (on startup and the writer), but the checkpoint thread
1941
 
                 * reads it.
1942
 
                 */             
1943
 
                XTOperationPtr  op;
1944
 
                if ((op = (XTOperationPtr) xt_sl_first_item(tab->tab_op_list))) {
1945
 
                        if (tab->tab_wr_op_seq+1 == op->or_op_seq) {
1946
 
                                xres_apply_operations(self, ws, TRUE);
1947
 
                        }
1948
 
                }
1949
 
        }
1950
 
        else {
1951
 
                /* Add the operation to the list: */
1952
 
                XTOperationRec op;
1953
 
 
1954
 
                op.or_op_seq = op_seq;
1955
 
                op.or_op_len = len;
1956
 
                op.or_log_id = log_id;
1957
 
                op.or_log_offset = log_offset;
1958
 
                xt_sl_lock(self, tab->tab_op_list);
1959
 
                xt_sl_insert(self, tab->tab_op_list, &op_seq, &op);
1960
 
/*DBG*/
1961
 
                //ASSERT(tab->tab_op_list->sl_usage_count < 1000000);
1962
 
                xt_sl_unlock(self, tab->tab_op_list);
1963
 
        }
1964
 
}
1965
 
 
1966
 
/* ----------------------------------------------------------------------
1967
 
 * CHECKPOINTING FUNCTIONALITY
1968
 
 */
1969
 
 
1970
 
static xtBool xres_delete_data_log(XTDatabaseHPtr db, xtLogID log_id)
1971
 
{
1972
 
        XTDataLogFilePtr        data_log;
1973
 
        char                            path[PATH_MAX];
1974
 
 
1975
 
        db->db_datalogs.dlc_name(PATH_MAX, path, log_id);
1976
 
 
1977
 
        if (!db->db_datalogs.dlc_remove_data_log(log_id, TRUE))
1978
 
                return FAILED;
1979
 
 
1980
 
        if (xt_fs_exists(path)) {
1981
 
#ifdef DEBUG_LOG_DELETE
1982
 
                printf("-- delete log: %s\n", path);
1983
 
#endif
1984
 
                if (!xt_fs_delete(NULL, path))
1985
 
                        return FAILED;
1986
 
        }
1987
 
        /* The log was deleted: */
1988
 
        if (!db->db_datalogs.dlc_get_data_log(&data_log, log_id, TRUE, NULL))
1989
 
                return FAILED;
1990
 
        if (data_log) {
1991
 
                if (!db->db_datalogs.dls_set_log_state(data_log, XT_DL_DELETED))
1992
 
                        return FAILED;
1993
 
        }
1994
 
        return OK;
1995
 
}
1996
 
 
1997
 
static int xres_comp_flush_tabs(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
1998
 
{
1999
 
        xtTableID                               tab_id = *((xtTableID *) a);
2000
 
        XTCheckPointTablePtr    cp_tab = (XTCheckPointTablePtr) b;
2001
 
 
2002
 
        if (tab_id < cp_tab->cpt_tab_id)
2003
 
                return -1;
2004
 
        if (tab_id > cp_tab->cpt_tab_id)
2005
 
                return 1;
2006
 
        return 0;
2007
 
}
2008
 
 
2009
 
static void xres_init_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
2010
 
{
2011
 
        xt_init_mutex_with_autoname(self, &cp->cp_state_lock);
2012
 
        cp->cp_inited = TRUE;
2013
 
}
2014
 
 
2015
 
static void xres_free_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
2016
 
{
2017
 
        cp->cp_inited = FALSE;
2018
 
        xt_free_mutex(&cp->cp_state_lock);
2019
 
        if (cp->cp_table_ids) {
2020
 
                xt_free_sortedlist(self, cp->cp_table_ids);
2021
 
                cp->cp_table_ids = NULL;
2022
 
        }
2023
 
}
2024
 
 
2025
 
/*
2026
 
 * Remove the deleted logs so that they can be re-used.
2027
 
 * This is only possible after a checkpoint has been
2028
 
 * written that does _not_ include these logs as logs
2029
 
 * to be deleted!
2030
 
 */
2031
 
static xtBool xres_remove_data_logs(XTDatabaseHPtr db)
2032
 
{
2033
 
        u_int           no_of_logs = xt_sl_get_size(db->db_datalogs.dlc_deleted);
2034
 
        xtLogID         *log_id_ptr;
2035
 
 
2036
 
        for (u_int i=0; i<no_of_logs; i++) {
2037
 
                log_id_ptr = (xtLogID *) xt_sl_item_at(db->db_datalogs.dlc_deleted, i);
2038
 
                if (!db->db_datalogs.dlc_remove_data_log(*log_id_ptr, FALSE))
2039
 
                        return FAILED;
2040
 
        }
2041
 
        xt_sl_set_size(db->db_datalogs.dlc_deleted, 0);
2042
 
        return OK;
2043
 
}
2044
 
 
2045
 
/* ----------------------------------------------------------------------
2046
 
 * INIT & EXIT
2047
 
 */
2048
 
 
2049
 
xtPublic void xt_xres_init(XTThreadPtr self, XTDatabaseHPtr db)
2050
 
{
2051
 
        xtLogID max_log_id;
2052
 
 
2053
 
        xt_init_mutex_with_autoname(self, &db->db_cp_lock);
2054
 
        xt_init_cond(self, &db->db_cp_cond);
2055
 
        xt_init_mutex_with_autoname(self, &db->db_fl_lock);
2056
 
        
2057
 
        xres_init_checkpoint_state(self, &db->db_cp_state);
2058
 
        db->db_restart.xres_init(self, db, &db->db_wr_log_id, &db->db_wr_log_offset, &max_log_id);
2059
 
 
2060
 
        /* It is also the position where transactions will start writing the
2061
 
         * log:
2062
 
         */
2063
 
        if (!db->db_xlog.xlog_set_write_offset(db->db_wr_log_id, db->db_wr_log_offset, max_log_id, self))
2064
 
                xt_throw(self);
2065
 
}
2066
 
 
2067
 
xtPublic void xt_xres_exit(XTThreadPtr self, XTDatabaseHPtr db)
2068
 
{
2069
 
        db->db_restart.xres_exit(self);
2070
 
        xres_free_checkpoint_state(self, &db->db_cp_state);
2071
 
        xt_free_mutex(&db->db_cp_lock);
2072
 
        xt_free_cond(&db->db_cp_cond);
2073
 
        xt_free_mutex(&db->db_fl_lock);
2074
 
}
2075
 
 
2076
 
/* ----------------------------------------------------------------------
2077
 
 * RESTART FUNCTIONALITY
2078
 
 */
2079
 
 
2080
 
/*
2081
 
 * Restart the database. This function loads the restart position, and
2082
 
 * applies all changes in the logs, until the end of the log, or
2083
 
 * a corrupted record is found.
2084
 
 *
2085
 
 * The restart position is the position in the log where we know that
2086
 
 * all the changes up to that point have been flushed to the
2087
 
 * database.
2088
 
 *
2089
 
 * This is called the checkpoint position. The checkpoint position
2090
 
 * is written alternatively to 2 restart files.
2091
 
 *
2092
 
 * To make a checkpoint:
2093
 
 * Get the current log writer log offset.
2094
 
 * For each table:
2095
 
 *    Get the log offset of the next operation on the table, if an
2096
 
 *    operation is queued for the table.
2097
 
 *    Flush that table, and the operation sequence to the table.
2098
 
 * For each unclean transaction:
2099
 
 *    Get the log offset of the begin of the transaction.
2100
 
 * Write the lowest of all log offsets to the restart file!
2101
 
 */
2102
 
 
2103
 
void XTXactRestart::xres_init(XTThreadPtr self, XTDatabaseHPtr db, xtLogID *log_id, xtLogOffset *log_offset, xtLogID *max_log_id)
2104
 
{
2105
 
        char                                    path[PATH_MAX];
2106
 
        XTOpenFilePtr                   of = NULL;
2107
 
        XTXlogCheckpointDPtr    res_1_buffer = NULL;
2108
 
        XTXlogCheckpointDPtr    res_2_buffer = NULL;
2109
 
        XTXlogCheckpointDPtr    use_buffer;
2110
 
        xtLogID                                 ind_rec_log_id = 0;
2111
 
        xtLogOffset                             ind_rec_log_offset = 0;
2112
 
 
2113
 
        enter_();
2114
 
        xres_db = db;
2115
 
 
2116
 
        ASSERT(!self->st_database);
2117
 
        /* The following call stack:
2118
 
         * XTDatabaseLog::xlog_flush_pending()
2119
 
         * XTDatabaseLog::xlog_flush()
2120
 
         * xt_xlog_flush_log()
2121
 
         * xt_flush_indices()
2122
 
         * idx_out_of_memory_failure()
2123
 
         * xt_idx_delete()
2124
 
         * xres_remove_index_entries()
2125
 
         * xres_apply_change()
2126
 
         * xt_xres_apply_in_order()
2127
 
         * XTXactRestart::xres_restart()
2128
 
         * XTXactRestart::xres_init()
2129
 
         * Leads to st_database being used!
2130
 
         */
2131
 
        self->st_database = db;
2132
 
 
2133
 
#ifdef SKIP_STARTUP_CHECKPOINT
2134
 
        /* When debugging, we do not checkpoint immediately, just in case
2135
 
         * we detect a problem during recovery.
2136
 
         */
2137
 
        xres_cp_required = FALSE;
2138
 
#else
2139
 
        xres_cp_required = TRUE;
2140
 
#endif
2141
 
        xres_cp_number = 0;
2142
 
        try_(a) {
2143
 
 
2144
 
                /* Figure out which restart file to use.
2145
 
                 */
2146
 
                xres_name(PATH_MAX, path, 1);
2147
 
                if ((of = xt_open_file(self, path, XT_FT_STANDARD, XT_FS_MISSING_OK, 16*1024*1024))) {
2148
 
                        size_t res_1_size;
2149
 
 
2150
 
                        res_1_size = (size_t) xt_seek_eof_file(self, of);
2151
 
                        res_1_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_1_size);
2152
 
                        if (!xt_pread_file(of, 0, res_1_size, res_1_size, res_1_buffer, NULL, &self->st_statistics.st_x, self))
2153
 
                                xt_throw(self);
2154
 
                        xt_close_file(self, of);
2155
 
                        of = NULL;
2156
 
                        if (!xres_check_checksum(res_1_buffer, res_1_size)) {
2157
 
                                xt_free(self, res_1_buffer);
2158
 
                                res_1_buffer = NULL;
2159
 
                        }
2160
 
                }
2161
 
 
2162
 
                xres_name(PATH_MAX, path, 2);
2163
 
                if ((of = xt_open_file(self, path, XT_FT_STANDARD, XT_FS_MISSING_OK, 16*1024*1024))) {
2164
 
                        size_t res_2_size;
2165
 
 
2166
 
                        res_2_size = (size_t) xt_seek_eof_file(self, of);
2167
 
                        res_2_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_2_size);
2168
 
                        if (!xt_pread_file(of, 0, res_2_size, res_2_size, res_2_buffer, NULL, &self->st_statistics.st_x, self))
2169
 
                                xt_throw(self);
2170
 
                        xt_close_file(self, of);
2171
 
                        of = NULL;
2172
 
                        if (!xres_check_checksum(res_2_buffer, res_2_size)) {
2173
 
                                xt_free(self, res_2_buffer);
2174
 
                                res_2_buffer = NULL;
2175
 
                        }
2176
 
                }
2177
 
 
2178
 
                if (res_1_buffer && res_2_buffer) {
2179
 
                        if (xt_comp_log_pos(
2180
 
                                XT_GET_DISK_4(res_1_buffer->xcp_log_id_4),
2181
 
                                XT_GET_DISK_6(res_1_buffer->xcp_log_offs_6),
2182
 
                                XT_GET_DISK_4(res_2_buffer->xcp_log_id_4),
2183
 
                                XT_GET_DISK_6(res_2_buffer->xcp_log_offs_6)) > 0) {
2184
 
                                /* The first log is the further along than the second: */
2185
 
                                xt_free(self, res_2_buffer);
2186
 
                                res_2_buffer = NULL;
2187
 
                        }
2188
 
                        else {
2189
 
                                if (XT_GET_DISK_6(res_1_buffer->xcp_chkpnt_no_6) >
2190
 
                                        XT_GET_DISK_6(res_2_buffer->xcp_chkpnt_no_6)) {
2191
 
                                        xt_free(self, res_2_buffer);
2192
 
                                        res_2_buffer = NULL;
2193
 
                                }
2194
 
                                else {
2195
 
                                        xt_free(self, res_1_buffer);
2196
 
                                        res_1_buffer = NULL;
2197
 
                                }
2198
 
                        }
2199
 
                }
2200
 
 
2201
 
                if (res_1_buffer) {
2202
 
                        use_buffer = res_1_buffer;
2203
 
                        xres_next_res_no = 2;
2204
 
                }
2205
 
                else {
2206
 
                        use_buffer = res_2_buffer;
2207
 
                        xres_next_res_no = 1;
2208
 
                }
2209
 
 
2210
 
                /* Read the checkpoint data: */
2211
 
                if (use_buffer) {
2212
 
                        u_int           no_of_logs;
2213
 
                        xtLogID         xt_log_id;
2214
 
                        xtTableID       xt_tab_id;
2215
 
 
2216
 
                        xres_cp_number = XT_GET_DISK_6(use_buffer->xcp_chkpnt_no_6);
2217
 
                        xres_cp_log_id = XT_GET_DISK_4(use_buffer->xcp_log_id_4);
2218
 
                        xres_cp_log_offset = XT_GET_DISK_6(use_buffer->xcp_log_offs_6);
2219
 
                        xt_tab_id = XT_GET_DISK_4(use_buffer->xcp_tab_id_4);
2220
 
                        if (xt_tab_id > db->db_curr_tab_id)
2221
 
                                db->db_curr_tab_id = xt_tab_id;
2222
 
                        db->db_xn_curr_id = XT_GET_DISK_4(use_buffer->xcp_xact_id_4);
2223
 
                        ind_rec_log_id = XT_GET_DISK_4(use_buffer->xcp_ind_rec_log_id_4);
2224
 
                        ind_rec_log_offset = XT_GET_DISK_6(use_buffer->xcp_ind_rec_log_offs_6);
2225
 
                        no_of_logs = XT_GET_DISK_2(use_buffer->xcp_log_count_2);
2226
 
 
2227
 
#ifdef DEBUG_PRINT
2228
 
                        printf("CHECKPOINT log=%d offset=%d ", (int) xres_cp_log_id, (int) xres_cp_log_offset);
2229
 
                        if (no_of_logs)
2230
 
                                printf("DELETED LOGS: ");
2231
 
#endif
2232
 
 
2233
 
                        /* Logs that are deleted are locked until _after_ the next
2234
 
                         * checkpoint.
2235
 
                         *
2236
 
                         * To prevent the following problem from occuring:
2237
 
                         * - Recovery is performed, and log X is deleted 
2238
 
                         * - After delete a log is free for re-use.
2239
 
                         *   New data is writen to log X.
2240
 
                         * - Server crashes.
2241
 
                         * - Recovery is performed from previous checkpoint,
2242
 
                         *   and log X is deleted again.
2243
 
                         *
2244
 
                         * To lock the logs the are placed on the deleted list.
2245
 
                         * After the next checkpoint, all logs on this list
2246
 
                         * will be removed.
2247
 
                         */
2248
 
                        for (u_int i=0; i<no_of_logs; i++) {
2249
 
                                xt_log_id = (xtLogID) XT_GET_DISK_2(use_buffer->xcp_del_log[i]);
2250
 
#ifdef DEBUG_PRINT
2251
 
                                if (i != 0)
2252
 
                                        printf(", ");
2253
 
                                printf("%d", (int) xt_log_id);
2254
 
#endif
2255
 
#ifdef DEBUG_KEEP_LOGS
2256
 
                                xt_dl_set_to_delete(self, db, xt_log_id);
2257
 
#else
2258
 
                                if (!xres_delete_data_log(db, xt_log_id))
2259
 
                                        xt_throw(self);
2260
 
#endif
2261
 
                        }
2262
 
 
2263
 
#ifdef DEBUG_PRINT
2264
 
                        printf("\n");
2265
 
#endif
2266
 
                }
2267
 
                else {
2268
 
                        /* Try to determine the correct start point. */
2269
 
                        xres_cp_number = 0;
2270
 
                        xres_cp_log_id = xt_xlog_get_min_log(self, db);
2271
 
                        xres_cp_log_offset = 0;
2272
 
                        ind_rec_log_id = xres_cp_log_id;
2273
 
                        ind_rec_log_offset = xres_cp_log_offset;
2274
 
 
2275
 
#ifdef DEBUG_PRINT
2276
 
                        printf("CHECKPOINT log=1 offset=0\n");
2277
 
#endif
2278
 
                }
2279
 
 
2280
 
                if (res_1_buffer) {
2281
 
                        xt_free(self, res_1_buffer);
2282
 
                        res_1_buffer = NULL;
2283
 
                }
2284
 
                if (res_2_buffer) {
2285
 
                        xt_free(self, res_2_buffer);
2286
 
                        res_2_buffer = NULL;
2287
 
                }
2288
 
 
2289
 
                if (!xres_restart(self, log_id, log_offset, ind_rec_log_id, ind_rec_log_offset, max_log_id))
2290
 
                        xt_throw(self);
2291
 
        }
2292
 
        catch_(a) {
2293
 
                XTException e;
2294
 
 
2295
 
                xt_enter_exception_handler(self, &e);
2296
 
                self->st_database = NULL;
2297
 
                if (of)
2298
 
                        xt_close_file_ns(of);
2299
 
                if (res_1_buffer)
2300
 
                        xt_free_ns(res_1_buffer);
2301
 
                if (res_2_buffer)
2302
 
                        xt_free_ns(res_2_buffer);
2303
 
                xres_exit(self);
2304
 
                xt_exit_exception_handler(self, &e);
2305
 
                xt_throw(self);
2306
 
        }
2307
 
        cont_(a);
2308
 
        self->st_database = NULL;
2309
 
 
2310
 
        exit_();
2311
 
}
2312
 
 
2313
 
void XTXactRestart::xres_exit(XTThreadPtr XT_UNUSED(self))
2314
 
{
2315
 
}
2316
 
 
2317
 
void XTXactRestart::xres_name(size_t size, char *path, xtLogID log_id)
2318
 
{
2319
 
        char name[50];
2320
 
 
2321
 
        sprintf(name, "restart-%lu.xt", (u_long) log_id);
2322
 
        xt_strcpy(size, path, xres_db->db_main_path);
2323
 
        xt_add_system_dir(size, path);
2324
 
        xt_add_dir_char(size, path);
2325
 
        xt_strcat(size, path, name);
2326
 
}
2327
 
 
2328
 
xtBool XTXactRestart::xres_check_checksum(XTXlogCheckpointDPtr buffer, size_t size)
2329
 
{
2330
 
        size_t          head_size;
2331
 
 
2332
 
        /* The minimum size: */
2333
 
        if (size < offsetof(XTXlogCheckpointDRec, xcp_head_size_4) + 4)
2334
 
                return FAILED;
2335
 
 
2336
 
        /* Check the sizes: */
2337
 
        head_size = XT_GET_DISK_4(buffer->xcp_head_size_4);
2338
 
        if (size < head_size)
2339
 
                return FAILED;
2340
 
 
2341
 
        if (XT_GET_DISK_2(buffer->xcp_checksum_2) != xt_get_checksum(((xtWord1 *) buffer) + 2, size - 2, 1))
2342
 
                return FAILED;
2343
 
 
2344
 
        if (XT_GET_DISK_2(buffer->xcp_version_2) != XT_CHECKPOINT_VERSION)
2345
 
                return FAILED;
2346
 
 
2347
 
        return OK;
2348
 
}
2349
 
 
2350
 
void XTXactRestart::xres_recover_progress(XTThreadPtr self, XTOpenFilePtr *of, int perc)
2351
 
{
2352
 
#ifdef XT_USE_GLOBAL_DB
2353
 
        if (perc > 100) {
2354
 
                char file_path[PATH_MAX];
2355
 
 
2356
 
                if (*of) {
2357
 
                        xt_close_file(self, *of);
2358
 
                        *of = NULL;
2359
 
                }
2360
 
                xt_strcpy(PATH_MAX, file_path, xres_db->db_main_path);
2361
 
                xt_add_pbxt_file(PATH_MAX, file_path, "recovery-progress");
2362
 
                if (xt_fs_exists(file_path))
2363
 
                        xt_fs_delete(self, file_path);
2364
 
        }
2365
 
        else {
2366
 
                char number[40];
2367
 
 
2368
 
                if (!*of) {
2369
 
                        char file_path[PATH_MAX];
2370
 
 
2371
 
                        xt_strcpy(PATH_MAX, file_path, xres_db->db_main_path);
2372
 
                        xt_add_pbxt_file(PATH_MAX, file_path, "recovery-progress");
2373
 
                        *of = xt_open_file(self, file_path, XT_FT_STANDARD, XT_FS_CREATE | XT_FS_MAKE_PATH, 16*1024*1024);
2374
 
                        xt_set_eof_file(self, *of, 0);
2375
 
                }
2376
 
 
2377
 
                sprintf(number, "%d", perc);
2378
 
                if (!xt_pwrite_file(*of, 0, strlen(number), number, &self->st_statistics.st_x, self))
2379
 
                        xt_throw(self);
2380
 
                if (!xt_flush_file(*of, &self->st_statistics.st_x, self))
2381
 
                        xt_throw(self);
2382
 
        }
2383
 
#endif
2384
 
}
2385
 
 
2386
 
xtBool XTXactRestart::xres_restart(XTThreadPtr self, xtLogID *log_id, xtLogOffset *log_offset, xtLogID ind_rec_log_id, xtLogOffset ind_rec_log_offset, xtLogID *max_log_id)
2387
 
{
2388
 
        xtBool                                  ok = TRUE;
2389
 
        XTDatabaseHPtr                  db = xres_db;
2390
 
        XTXactLogBufferDPtr             record;
2391
 
        xtXactID                                xn_id;
2392
 
        XTXactDataPtr                   xact;
2393
 
        xtTableID                               tab_id;
2394
 
        XTWriterStateRec                ws;
2395
 
        off_t                                   bytes_read = 0;
2396
 
        off_t                                   bytes_to_read;
2397
 
        volatile xtBool                 print_progress = FALSE;
2398
 
        volatile off_t                  perc_size = 0, next_goal = 0;
2399
 
        int                                             perc_complete = 1, perc_to_write = 1;
2400
 
        XTOpenFilePtr                   progress_file = NULL;
2401
 
        xtBool                                  min_ram_xn_id_set = FALSE;
2402
 
        u_int                                   log_count;
2403
 
        time_t                                  start_time;
2404
 
 
2405
 
        memset(&ws, 0, sizeof(ws));
2406
 
 
2407
 
        ws.ws_db = db;
2408
 
        ws.ws_in_recover = TRUE;
2409
 
        ws.ws_ind_rec_log_id = ind_rec_log_id;
2410
 
        ws.ws_ind_rec_log_offset = ind_rec_log_offset;
2411
 
 
2412
 
        /* Initialize the data log buffer (required if extended data is
2413
 
         * referenced).
2414
 
         * Note: this buffer is freed later. It is part of the thread
2415
 
         * "open database" state, and this means that a thread
2416
 
         * may not have another database open (in use) when
2417
 
         * it calls this functions.
2418
 
         */
2419
 
        self->st_dlog_buf.dlb_init(db, xt_db_log_buffer_size);
2420
 
 
2421
 
        if (!db->db_xlog.xlog_seq_init(&ws.ws_seqread, xt_db_log_buffer_size, TRUE))
2422
 
                return FAILED;
2423
 
 
2424
 
        bytes_to_read = xres_bytes_to_read(self, db, &log_count, max_log_id);
2425
 
        /* Don't print anything about recovering an empty database: */
2426
 
        if (bytes_to_read != 0)
2427
 
                xt_logf(XT_NT_INFO, "PBXT: Recovering from %lu-%llu, bytes to read: %llu\n", (u_long) xres_cp_log_id, (u_llong) xres_cp_log_offset, (u_llong) bytes_to_read);
2428
 
 
2429
 
        print_progress = FALSE;
2430
 
        start_time = time(NULL);
2431
 
        perc_size = bytes_to_read / 100;
2432
 
        next_goal = perc_size;
2433
 
 
2434
 
        if (!db->db_xlog.xlog_seq_start(&ws.ws_seqread, xres_cp_log_id, xres_cp_log_offset, FALSE)) {
2435
 
                ok = FALSE;
2436
 
                goto failed;
2437
 
        }
2438
 
 
2439
 
        try_(a) {
2440
 
                for (;;) {
2441
 
                        if (!db->db_xlog.xlog_seq_next(&ws.ws_seqread, &record, TRUE, self)) {
2442
 
                                ok = FALSE;
2443
 
                                break;
2444
 
                        }
2445
 
                        /* Increment before. If record is NULL then xseq_record_len will be zero,
2446
 
                         * UNLESS the last record was of type XT_LOG_ENT_END_OF_LOG 
2447
 
                         * which fills the log to align to block of size 512.
2448
 
                         */
2449
 
                        bytes_read += ws.ws_seqread.xseq_record_len;
2450
 
                        if (!record)
2451
 
                                break;
2452
 
#ifdef PRINT_LOG_ON_RECOVERY
2453
 
                        xt_print_log_record(ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
2454
 
#endif
2455
 
                        if (next_goal && bytes_read >= next_goal) {
2456
 
                                while (bytes_read >= next_goal) {
2457
 
                                        next_goal += perc_size;
2458
 
                                        perc_complete++;
2459
 
                                }
2460
 
                                if (!print_progress) {
2461
 
                                        if (time(NULL) - start_time > 2)
2462
 
                                                print_progress = TRUE;
2463
 
                                }
2464
 
                                if (print_progress) {
2465
 
                                        while (perc_to_write < perc_complete) {
2466
 
                                                if (((perc_to_write - 1) % 25) == 0)
2467
 
                                                        xt_logf(XT_NT_INFO, "PBXT: ");
2468
 
                                                if ((perc_to_write % 25) == 0)
2469
 
                                                        xt_logf(XT_NT_INFO, "%2d\n", (int) perc_to_write);
2470
 
                                                else
2471
 
                                                        xt_logf(XT_NT_INFO, "%2d ", (int) perc_to_write);
2472
 
                                                xt_log_flush(self);
2473
 
                                                xres_recover_progress(self, &progress_file, perc_to_write);
2474
 
                                                perc_to_write++;
2475
 
                                        }
2476
 
                                }
2477
 
                        }
2478
 
                        switch (record->xl.xl_status_1) {
2479
 
                                case XT_LOG_ENT_HEADER:
2480
 
                                        break;
2481
 
                                case XT_LOG_ENT_NEW_LOG: {
2482
 
                                        /* Adjust the bytes read for the fact that logs are written
2483
 
                                         * on 512 byte boundaries.
2484
 
                                         */
2485
 
                                        off_t offs, eof = ws.ws_seqread.xseq_log_eof;
2486
 
 
2487
 
                                        offs = ws.ws_seqread.xseq_rec_log_offset + ws.ws_seqread.xseq_record_len;
2488
 
                                        if (eof > offs)
2489
 
                                                bytes_read += eof - offs;
2490
 
                                        if (!db->db_xlog.xlog_seq_start(&ws.ws_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, TRUE))
2491
 
                                                xt_throw(self);
2492
 
                                        break;
2493
 
                                }
2494
 
                                case XT_LOG_ENT_NEW_TAB:
2495
 
                                        tab_id = XT_GET_DISK_4(record->xt.xt_tab_id_4);
2496
 
                                        if (tab_id > db->db_curr_tab_id)
2497
 
                                                db->db_curr_tab_id = tab_id;
2498
 
                                        break;
2499
 
                                case XT_LOG_ENT_UPDATE_BG:
2500
 
                                case XT_LOG_ENT_INSERT_BG:
2501
 
                                case XT_LOG_ENT_DELETE_BG:
2502
 
                                        xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
2503
 
                                        goto start_xact;
2504
 
                                case XT_LOG_ENT_UPDATE_FL_BG:
2505
 
                                case XT_LOG_ENT_INSERT_FL_BG:
2506
 
                                case XT_LOG_ENT_DELETE_FL_BG:
2507
 
                                        xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
2508
 
                                        start_xact:
2509
 
                                        if (xt_xn_is_before(db->db_xn_curr_id, xn_id))
2510
 
                                                db->db_xn_curr_id = xn_id;
2511
 
 
2512
 
                                        if (!(xact = xt_xn_add_old_xact(db, xn_id, self)))
2513
 
                                                xt_throw(self);
2514
 
 
2515
 
                                        xact->xd_begin_log = ws.ws_seqread.xseq_rec_log_id;
2516
 
                                        xact->xd_begin_offset = ws.ws_seqread.xseq_rec_log_offset;
2517
 
 
2518
 
                                        xact->xd_end_xn_id = xn_id;
2519
 
                                        xact->xd_end_time = db->db_xn_end_time;
2520
 
                                        xact->xd_flags = (XT_XN_XAC_LOGGED | XT_XN_XAC_ENDED | XT_XN_XAC_RECOVERED | XT_XN_XAC_SWEEP);
2521
 
 
2522
 
                                        /* This may affect the "minimum RAM transaction": */
2523
 
                                        if (!min_ram_xn_id_set || xt_xn_is_before(xn_id, db->db_xn_min_ram_id)) {
2524
 
                                                min_ram_xn_id_set = TRUE;
2525
 
                                                db->db_xn_min_ram_id = xn_id;
2526
 
                                        }
2527
 
                                        xt_xres_apply_in_order(self, &ws, ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
2528
 
                                        break;
2529
 
                                case XT_LOG_ENT_COMMIT:
2530
 
                                case XT_LOG_ENT_ABORT:
2531
 
                                        xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
2532
 
                                        if ((xact = xt_xn_get_xact(db, xn_id, self))) {
2533
 
                                                xact->xd_end_xn_id = xn_id;
2534
 
                                                xact->xd_flags |= XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
2535
 
                                                xact->xd_flags &= ~XT_XN_XAC_RECOVERED; // We can expect an end record on cleanup!
2536
 
                                                xact->xd_flags &= ~XT_XN_XAC_PREPARED;  // Prepared transactions cannot be swept!
2537
 
                                                if (record->xl.xl_status_1 == XT_LOG_ENT_COMMIT)
2538
 
                                                        xact->xd_flags |= XT_XN_XAC_COMMITTED;
2539
 
                                                if (xt_sl_get_size(db->db_xn_xa_list) > 0)
2540
 
                                                        xt_xn_delete_xa_data_by_xact(db, xn_id, self);
2541
 
                                        }
2542
 
                                        break;
2543
 
                                case XT_LOG_ENT_CLEANUP:
2544
 
                                        /* The transaction was cleaned up: */
2545
 
                                        xn_id = XT_GET_DISK_4(record->xc.xc_xact_id_4);
2546
 
                                        xt_xn_delete_xact(db, xn_id, self);
2547
 
                                        break;
2548
 
                                case XT_LOG_ENT_OP_SYNC:
2549
 
                                        xres_sync_operations(self, db, &ws);
2550
 
                                        break;
2551
 
                                case XT_LOG_ENT_DEL_LOG:
2552
 
                                        xtLogID rec_log_id;
2553
 
 
2554
 
                                        rec_log_id = XT_GET_DISK_4(record->xl.xl_log_id_4);
2555
 
                                        xt_dl_set_to_delete(self, db, rec_log_id);
2556
 
                                        break;
2557
 
                                case XT_LOG_ENT_PREPARE:
2558
 
                                        xn_id = XT_GET_DISK_4(record->xp.xp_xact_id_4);
2559
 
                                        if ((xact = xt_xn_get_xact(db, xn_id, self))) {
2560
 
                                                xact->xd_flags |= XT_XN_XAC_PREPARED;
2561
 
                                                if (!xt_xn_store_xa_data(db, xn_id, record->xp.xp_xa_len_1, record->xp.xp_xa_data, self))
2562
 
                                                        xt_throw(self);
2563
 
                                        }
2564
 
                                        break;
2565
 
                                default:
2566
 
                                        xt_xres_apply_in_order(self, &ws, ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
2567
 
                                        break;
2568
 
                        }
2569
 
                }
2570
 
 
2571
 
                if (xres_sync_operations(self, db, &ws)) {
2572
 
                        XTactOpSyncEntryDRec    op_sync;
2573
 
                        time_t                                  now = time(NULL);
2574
 
 
2575
 
                        op_sync.os_status_1 = XT_LOG_ENT_OP_SYNC;
2576
 
                        op_sync.os_checksum_1 = XT_CHECKSUM_1(now) ^ XT_CHECKSUM_1(ws.ws_seqread.xseq_rec_log_id);
2577
 
                        XT_SET_DISK_4(op_sync.os_time_4, (xtWord4) now);
2578
 
                        /* TODO: If this is done, check to see that
2579
 
                         * the byte written here are read back by the writter.
2580
 
                         * This is in order to be in sync with 'xl_log_bytes_written'.
2581
 
                         * i.e. xl_log_bytes_written == xl_log_bytes_read
2582
 
                         */
2583
 
                        if (!db->db_xlog.xlog_write_thru(&ws.ws_seqread, sizeof(XTactOpSyncEntryDRec), (xtWord1 *) &op_sync, self))
2584
 
                                xt_throw(self);
2585
 
                }
2586
 
 
2587
 
#ifdef XT_SORT_REC_WRITES
2588
 
                /* Flush all tables where we still have cached writes: */
2589
 
                xt_xres_flush_all(self, &ws);
2590
 
#endif
2591
 
        }
2592
 
        catch_(a) {
2593
 
                ok = FALSE;
2594
 
        }
2595
 
        cont_(a);
2596
 
 
2597
 
        if (ok) {
2598
 
                if (print_progress) {
2599
 
                        while (perc_complete <= 100) {
2600
 
                                if (((perc_complete - 1) % 25) == 0)
2601
 
                                        xt_logf(XT_NT_INFO, "PBXT: ");
2602
 
                                if ((perc_complete % 25) == 0)
2603
 
                                        xt_logf(XT_NT_INFO, "%2d\n", (int) perc_complete);
2604
 
                                else
2605
 
                                        xt_logf(XT_NT_INFO, "%2d ", (int) perc_complete);
2606
 
                                xt_log_flush(self);
2607
 
                                xres_recover_progress(self, &progress_file, perc_complete);
2608
 
                                perc_complete++;
2609
 
                        }
2610
 
                }
2611
 
                if (bytes_to_read != 0)
2612
 
                        xt_logf(XT_NT_INFO, "PBXT: Recovery complete at %lu-%llu, bytes read: %llu\n", (u_long) ws.ws_seqread.xseq_rec_log_id, (u_llong) ws.ws_seqread.xseq_rec_log_offset, (u_llong) bytes_read);
2613
 
 
2614
 
                *log_id = ws.ws_seqread.xseq_rec_log_id;
2615
 
                *log_offset = ws.ws_seqread.xseq_rec_log_offset;
2616
 
 
2617
 
                if (!min_ram_xn_id_set)
2618
 
                        /* This is true because if no transaction was placed in RAM then
2619
 
                         * the next transaction in RAM will have the next ID: */
2620
 
                        db->db_xn_min_ram_id = db->db_xn_curr_id + 1;
2621
 
 
2622
 
#ifdef RECOVER_FREE_COUNTS
2623
 
                if (xres_cp_log_id != *log_id || xres_cp_log_offset != *log_offset) {
2624
 
                        /* Recovery took place, correct the row count! */
2625
 
                        xres_recover_table_free_counts(self, db, &ws);
2626
 
                }
2627
 
#endif
2628
 
        }
2629
 
 
2630
 
        failed:
2631
 
        xt_free_writer_state(self, &ws);
2632
 
        self->st_dlog_buf.dlb_exit(self);
2633
 
        xres_recover_progress(self, &progress_file, 101);
2634
 
        return ok;
2635
 
}
2636
 
 
2637
 
xtBool XTXactRestart::xres_is_checkpoint_pending(xtLogID curr_log_id, xtLogOffset curr_log_offset)
2638
 
{
2639
 
        return xt_bytes_since_last_checkpoint(xres_db, curr_log_id, curr_log_offset) >= xt_db_checkpoint_frequency;
2640
 
}
2641
 
 
2642
 
/*
2643
 
 * Calculate the bytes to be read for recovery.
2644
 
 * This is only an estimate of the number of bytes that
2645
 
 * will be read.
2646
 
 */
2647
 
off_t XTXactRestart::xres_bytes_to_read(XTThreadPtr self, XTDatabaseHPtr db, u_int *log_count, xtLogID *max_log_id)
2648
 
{
2649
 
        off_t                           to_read = 0, eof;
2650
 
        xtLogID                         log_id = xres_cp_log_id;
2651
 
        char                            log_path[PATH_MAX];
2652
 
        XTOpenFilePtr           of;
2653
 
        XTXactLogHeaderDRec     log_head;
2654
 
        xtWord1                         end_head[4];
2655
 
        size_t                          head_size;
2656
 
        size_t                          red_size;
2657
 
 
2658
 
        *max_log_id = log_id;
2659
 
        *log_count = 0;
2660
 
        for (;;) {
2661
 
                db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
2662
 
                of = NULL;
2663
 
                if (!xt_open_file_ns(&of, log_path, XT_FT_STANDARD, XT_FS_MISSING_OK, 16*1024*1024))
2664
 
                        xt_throw(self);
2665
 
                if (!of)
2666
 
                        break;
2667
 
                pushr_(xt_close_file, of);
2668
 
 
2669
 
                /* Check the first record of the log, to see if it is valid. */
2670
 
                if (!xt_pread_file(of, 0, sizeof(XTXactLogHeaderDRec), 0, (xtWord1 *) &log_head, &red_size, &self->st_statistics.st_xlog, self))
2671
 
                        xt_throw(self);
2672
 
                /* The minimum size (old log size): */
2673
 
                if (red_size < XT_MIN_LOG_HEAD_SIZE)
2674
 
                        goto header_corrupt;
2675
 
                head_size = XT_GET_DISK_4(log_head.xh_size_4);
2676
 
                if (log_head.xh_status_1 != XT_LOG_ENT_HEADER)
2677
 
                        goto header_corrupt;
2678
 
                if (log_head.xh_checksum_1 != XT_CHECKSUM_1(log_id))
2679
 
                        goto header_corrupt;
2680
 
                if (!xt_pread_file(of, head_size - 4, 4, 0, end_head, &red_size, &self->st_statistics.st_xlog, self))
2681
 
                        xt_throw(self);
2682
 
                if (red_size != 4 || XT_GET_DISK_4(end_head) != XT_LOG_FILE_MAGIC)
2683
 
                        goto header_corrupt;
2684
 
                if (head_size > offsetof(XTXactLogHeaderDRec, xh_log_id_4) + 4) {
2685
 
                        if (XT_GET_DISK_4(log_head.xh_log_id_4) != log_id)
2686
 
                                goto header_corrupt;
2687
 
                }
2688
 
                if (head_size > offsetof(XTXactLogHeaderDRec, xh_version_2) + 2) {
2689
 
                        if (XT_GET_DISK_2(log_head.xh_version_2) > XT_LOG_VERSION_NO)                           
2690
 
                                xt_throw_ulxterr(XT_CONTEXT, XT_ERR_NEW_TYPE_OF_XLOG, (u_long) log_id);
2691
 
                }
2692
 
 
2693
 
                eof = xt_seek_eof_file(self, of);
2694
 
                freer_(); // xt_close_file(of)
2695
 
                if (log_id == xres_cp_log_id)
2696
 
                        to_read += (eof - xres_cp_log_offset);
2697
 
                else
2698
 
                        to_read += eof;
2699
 
                (*log_count)++;
2700
 
                *max_log_id = log_id;
2701
 
                log_id++;
2702
 
        }
2703
 
        return to_read;
2704
 
 
2705
 
        header_corrupt:
2706
 
        if (log_id == xres_cp_log_id && xres_cp_log_offset > (xtLogOffset) sizeof(XTXactLogHeaderDRec))
2707
 
                xt_throw_ulxterr(XT_CONTEXT, XT_ERR_LOG_HEADER_CORRUPT, (u_long) log_id);
2708
 
 
2709
 
        freer_(); // xt_close_file(of)
2710
 
        return to_read;
2711
 
 
2712
 
}
2713
 
 
2714
 
 
2715
 
/* ----------------------------------------------------------------------
2716
 
 * C H E C K P O I N T    P R O C E S S
2717
 
 */
2718
 
 
2719
 
#ifdef NEVER_CHECKPOINT
2720
 
xtBool no_checkpoint = TRUE;
2721
 
#endif
2722
 
 
2723
 
#define XT_CHECKPOINT_IF_NO_ACTIVITY            0
2724
 
#define XT_CHECKPOINT_PAUSE_IF_ACTIVITY         1
2725
 
#define XT_CHECKPOINT_NO_PAUSE                          2
2726
 
 
2727
 
#define XT_ASYNC_THREAD_FLUSH_LIMIT                     200
2728
 
 
2729
 
static void xres_cp_async_checkpoint(XTThreadPtr self, XTDatabaseHPtr db, u_int curr_writer_total, xtBool force_checkpoint)
2730
 
{
2731
 
        XTTableHPtr                     tab = NULL;
2732
 
        xtTableID                       tab_id;
2733
 
        int                                     flush_bit;
2734
 
        int                                     start_count = 0;
2735
 
 
2736
 
        /* Start a checkpoint (if one is not already running): */
2737
 
        if (!xt_begin_checkpoint(db, FALSE, self))
2738
 
                xt_throw(self);
2739
 
 
2740
 
        while (!self->t_quit) {
2741
 
                if (!xres_get_next_to_flush(db, &tab_id, &flush_bit, FALSE, TRUE))
2742
 
                        goto end_checkpoint;
2743
 
 
2744
 
                if (!tab_id) {
2745
 
                        if (!xt_wait_for_async_task_results(self))
2746
 
                                goto failed;
2747
 
                        start_count = 0;
2748
 
                        continue;
2749
 
                }
2750
 
 
2751
 
                if (tab) {
2752
 
                        if (tab->tab_id != tab_id) {
2753
 
                                xt_heap_release(self, tab);
2754
 
                                tab = NULL;
2755
 
                        }
2756
 
                }
2757
 
 
2758
 
                if (!tab) {
2759
 
                        if (!(tab = xt_use_table_by_id(self, db, tab_id, NULL))) {
2760
 
                                xt_throw_ulxterr(XT_CONTEXT, XT_ERR_TABLE_NOT_FOUND, (u_long) tab_id);
2761
 
                                goto failed;
2762
 
                        }
2763
 
                }
2764
 
 
2765
 
                if (flush_bit == XT_CPT_REC_ROW_FLUSHED) {
2766
 
                        if (!xt_async_flush_record_row(tab, TRUE, self))
2767
 
                                goto failed;
2768
 
                }
2769
 
                else {
2770
 
                        ASSERT(flush_bit == XT_CPT_INDEX_FLUSHED);
2771
 
                        if (!xt_async_flush_indices(tab, TRUE, FALSE, self))
2772
 
                                goto failed;
2773
 
                }
2774
 
                start_count++;
2775
 
 
2776
 
                if (!force_checkpoint) {
2777
 
                        if (curr_writer_total != db->db_xn_total_writer_count)
2778
 
                                goto end_checkpoint;
2779
 
                }
2780
 
 
2781
 
                if (start_count >= XT_ASYNC_THREAD_FLUSH_LIMIT) {
2782
 
                        if (!xt_wait_for_async_task_results(self))
2783
 
                                goto failed;
2784
 
                        start_count = 0;
2785
 
                }
2786
 
        }
2787
 
 
2788
 
        end_checkpoint:
2789
 
        if (tab)
2790
 
                xt_heap_release(self, tab);
2791
 
 
2792
 
        if (!xt_wait_for_async_task_results(self))
2793
 
                xt_throw(self);
2794
 
 
2795
 
        if (!xt_end_checkpoint(db, self, NULL))
2796
 
                xt_throw(self);
2797
 
 
2798
 
        return;
2799
 
        
2800
 
        failed:
2801
 
        if (tab)
2802
 
                xt_heap_release(self, tab);
2803
 
        xt_throw(self);
2804
 
}
2805
 
 
2806
 
/*
2807
 
 * This function performs table flush, as long as the system is idle.
2808
 
 */
2809
 
static void xres_cp_checkpoint(XTThreadPtr self, XTDatabaseHPtr db, u_int curr_writer_total, xtBool force_checkpoint)
2810
 
{
2811
 
        XTOpenTablePtr                  ot = NULL;
2812
 
        xtTableID                               tab_id;
2813
 
        int                                             flush_bit;
2814
 
        off_t                                   bytes_flushed = 0;
2815
 
        int                                             check_type;
2816
 
 
2817
 
#ifdef NEVER_CHECKPOINT
2818
 
        if (no_checkpoint)
2819
 
                return FALSE;
2820
 
#endif
2821
 
        if (force_checkpoint) {
2822
 
                if (db->db_restart.xres_cp_required)
2823
 
                        check_type = XT_CHECKPOINT_NO_PAUSE;
2824
 
                else
2825
 
                        check_type = XT_CHECKPOINT_PAUSE_IF_ACTIVITY;
2826
 
        }
2827
 
        else
2828
 
                check_type = XT_CHECKPOINT_IF_NO_ACTIVITY;      
2829
 
 
2830
 
        /* Start a checkpoint: */
2831
 
        if (!xt_begin_checkpoint(db, FALSE, self))
2832
 
                xt_throw(self);
2833
 
 
2834
 
        while (!self->t_quit) {
2835
 
                if (!xres_get_next_to_flush(db, &tab_id, &flush_bit, TRUE, FALSE))
2836
 
                        goto end_checkpoint;
2837
 
 
2838
 
                if (ot) {
2839
 
                        if (ot->ot_table->tab_id != tab_id) {
2840
 
                                xt_db_return_table_to_pool(self, ot);
2841
 
                                ot = NULL;
2842
 
                        }
2843
 
                }
2844
 
 
2845
 
                if (!ot)
2846
 
                        ot = xt_db_open_pool_table(self, db, tab_id, NULL, TRUE);
2847
 
 
2848
 
                if (ot) {
2849
 
                        if (flush_bit == XT_CPT_REC_ROW_FLUSHED) {
2850
 
                                if (!xt_flush_record_row(ot, &bytes_flushed, FALSE)) {
2851
 
                                        xt_db_return_table_to_pool(self, ot);
2852
 
                                        xt_throw(self);
2853
 
                                }
2854
 
                        }
2855
 
                        else {
2856
 
                                ASSERT(flush_bit == XT_CPT_INDEX_FLUSHED);
2857
 
                                if (!xt_flush_indices(ot, &bytes_flushed, FALSE, NULL)) {
2858
 
                                        xt_db_return_table_to_pool(self, ot);
2859
 
                                        xt_throw(self);
2860
 
                                }
2861
 
                        }
2862
 
                }
2863
 
                else {
2864
 
                        /* The table was not found. Question, should I just complete the checkpoint,
2865
 
                         * or should I cause an error?
2866
 
                         *
2867
 
                         * At the moment, I print the error and warning, and continue with the checkpoint.
2868
 
                         */
2869
 
                        if (self->t_exception.e_xt_err)
2870
 
                                xt_log_and_clear_exception(self);
2871
 
                        xt_logf(XT_NT_WARNING, "Checkpoint skipping table (ID) %lu: table was not found\n", (u_long) tab_id);
2872
 
                        xt_checkpoint_set_flush_state(db, tab_id, XT_CPT_STATE_DONE_ALL);
2873
 
                }
2874
 
 
2875
 
                switch (check_type) {
2876
 
                        case XT_CHECKPOINT_IF_NO_ACTIVITY:
2877
 
                                if (bytes_flushed > 0 && curr_writer_total != db->db_xn_total_writer_count)
2878
 
                                        goto end_checkpoint;
2879
 
                                break;
2880
 
                        case XT_CHECKPOINT_PAUSE_IF_ACTIVITY:
2881
 
                                if (bytes_flushed > 2 * 1024 * 1024 && curr_writer_total != db->db_xn_total_writer_count) {
2882
 
                                        curr_writer_total = db->db_xn_total_writer_count;
2883
 
                                        bytes_flushed = 0;
2884
 
                                        xt_sleep_milli_second(400);
2885
 
                                }
2886
 
                                break;
2887
 
                        case XT_CHECKPOINT_NO_PAUSE:
2888
 
                                break;
2889
 
                }
2890
 
        }
2891
 
 
2892
 
        end_checkpoint:
2893
 
        if (ot)
2894
 
                xt_db_return_table_to_pool(self, ot);
2895
 
 
2896
 
        if (!xt_end_checkpoint(db, self, NULL))
2897
 
                xt_throw(self);
2898
 
}
2899
 
 
2900
 
 
2901
 
/* Wait for the log writer to tell us to do something.
2902
 
 */
2903
 
static void xres_cp_wait_for_log_writer(XTThreadPtr self, XTDatabaseHPtr db, u_long milli_secs)
2904
 
{
2905
 
        xt_lock_mutex(self, &db->db_cp_lock);
2906
 
        pushr_(xt_unlock_mutex, &db->db_cp_lock);
2907
 
        if (!self->t_quit)
2908
 
                xt_timed_wait_cond(self, &db->db_cp_cond, &db->db_cp_lock, milli_secs);
2909
 
        freer_(); // xt_unlock_mutex(&db->db_cp_lock)
2910
 
}
2911
 
 
2912
 
static void xres_flush_indices(XTThreadPtr self, XTDatabaseHPtr db)
2913
 
{
2914
 
        u_int                   edx;
2915
 
        XTTableEntryPtr te_ptr;
2916
 
        XTTableHPtr             tab;
2917
 
 
2918
 
        if (!xt_db_index_dirty_threshold || xt_get_index_cache_dirty_perc() < xt_db_index_dirty_threshold)
2919
 
                return;
2920
 
 
2921
 
        /* Flush all indexes: */
2922
 
        xt_enum_tables_init(&edx);
2923
 
        while ((te_ptr = xt_enum_tables_next(NULL, db, &edx))) {
2924
 
                if ((tab = te_ptr->te_table)) {
2925
 
                        if (!xt_async_flush_indices(tab, FALSE, FALSE, self))
2926
 
                                xt_throw(self);
2927
 
                }
2928
 
        }
2929
 
}
2930
 
 
2931
 
/*
2932
 
 * This is the way checkpoint works:
2933
 
 *
2934
 
 * To write a checkpoint we need to flush all tables in
2935
 
 * the database.
2936
 
 *
2937
 
 * Before flushing the first table we get the checkpoint
2938
 
 * log position.
2939
 
 *
2940
 
 * After flushing all files we write of the checkpoint
2941
 
 * log position.
2942
 
 */
2943
 
static void xres_cp_main(XTThreadPtr self)
2944
 
{
2945
 
        XTDatabaseHPtr          db = self->st_database;
2946
 
        u_int                           curr_writer_total;
2947
 
        time_t                          now, last_index_flush;
2948
 
        xtBool                          async = TRUE;
2949
 
        xtXactID                        sweep_count;
2950
 
 
2951
 
        xt_set_low_priority(self);
2952
 
 
2953
 
        xt_db_approximate_time = time(NULL);
2954
 
        last_index_flush = xt_db_approximate_time;
2955
 
        while (!self->t_quit) {
2956
 
                /* Wait 2 seconds: */
2957
 
                curr_writer_total = db->db_xn_total_writer_count;
2958
 
                xt_db_approximate_time = time(NULL);
2959
 
                now = xt_db_approximate_time;
2960
 
                while (!self->t_quit && xt_db_approximate_time < now + 2 && !db->db_restart.xres_cp_required) {
2961
 
                        xres_cp_wait_for_log_writer(self, db, 400);
2962
 
                        xt_db_approximate_time = time(NULL);
2963
 
                        xt_db_free_unused_open_tables(self, db);
2964
 
                        xt_db_approximate_time = time(NULL);
2965
 
                        if (xt_db_approximate_time > last_index_flush + 2) {
2966
 
                                last_index_flush = xt_db_approximate_time;
2967
 
                                xres_flush_indices(self, db);
2968
 
                        }
2969
 
                }
2970
 
                
2971
 
                if (self->t_quit)
2972
 
                        break;
2973
 
 
2974
 
                /* If there is a long running transaction, then there is actitivity! 
2975
 
                 * And if the sweeper is not idle, then there is also activity!
2976
 
                 * And if the writer is active, then we should wait for it as well!
2977
 
                 */
2978
 
#ifdef XT_SWEEPER_SORT_XACTS
2979
 
                sweep_count = db->db_xn_curr_id + 1 - db->db_sw_to_add + db->db_sw_list_size;
2980
 
#else
2981
 
                sweep_count = db->db_xn_curr_id + 1 - db->db_xn_to_clean_id;
2982
 
#endif
2983
 
                if (!db->db_xn_long_running_count && 
2984
 
                        curr_writer_total == db->db_xn_total_writer_count && 
2985
 
                        !sweep_count &&
2986
 
                        db->db_sw_idle == XT_THREAD_IDLE && 
2987
 
                        db->db_wr_idle == XT_THREAD_IDLE) {
2988
 
                        /* No activity in 2 seconds: */
2989
 
                        if (async)
2990
 
                                xres_cp_async_checkpoint(self, db, curr_writer_total, TRUE);
2991
 
                        else
2992
 
                                xres_cp_checkpoint(self, db, curr_writer_total, FALSE);
2993
 
                }
2994
 
                else {
2995
 
                        /* There server is busy, check if we need to
2996
 
                         * write a checkpoint anyway...
2997
 
                         */
2998
 
                        if (db->db_restart.xres_cp_required ||
2999
 
                                db->db_restart.xres_is_checkpoint_pending(db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset)) {
3000
 
                                /* Flush tables, until the checkpoint is complete. */
3001
 
                                if (db->db_restart.xres_is_checkpoint_pending(db->db_wr_log_id, db->db_wr_log_offset)) {
3002
 
                                        /* Wait until the writer has done enough for the checkpoint: */
3003
 
                                        if (async)
3004
 
                                                xres_cp_async_checkpoint(self, db, curr_writer_total, TRUE);
3005
 
                                        else
3006
 
                                                xres_cp_checkpoint(self, db, curr_writer_total, TRUE);
3007
 
                                }
3008
 
                                else {
3009
 
                                        /* Wake the writer if necessary: */
3010
 
                                        if (db->db_wr_idle == XT_THREAD_IDLE) {
3011
 
                                                if (!xt_broadcast_cond_ns(&db->db_wr_cond))
3012
 
                                                        xt_log_and_clear_exception_ns();
3013
 
                                        }
3014
 
                                }
3015
 
                        }
3016
 
                }
3017
 
 
3018
 
                if (curr_writer_total == db->db_xn_total_writer_count) {
3019
 
                        /* We did a checkpoint, and still, nothing has
3020
 
                         * happened....
3021
 
                         *
3022
 
                         * Wait for something to happen:
3023
 
                         */
3024
 
                        xtLogID         log_id;
3025
 
                        xtLogOffset     log_offset;
3026
 
 
3027
 
                        xt_db_approximate_time = time(NULL);
3028
 
                        while (!self->t_quit && curr_writer_total == db->db_xn_total_writer_count) {
3029
 
                                /* The writer position: */
3030
 
                                xt_lock_mutex(self, &db->db_wr_lock);
3031
 
                                pushr_(xt_unlock_mutex, &db->db_wr_lock);
3032
 
                                log_id = db->db_wr_log_id;
3033
 
                                log_offset = db->db_wr_log_offset;
3034
 
                                freer_(); // xt_unlock_mutex(&db->db_wr_lock)
3035
 
 
3036
 
                                /* This condition means we could checkpoint: */
3037
 
                                if (!(xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
3038
 
                                        xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
3039
 
                                        xt_comp_log_pos(log_id, log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0) &&
3040
 
                                        xt_sl_get_size(db->db_xn_xa_list) == 0)
3041
 
                                        break;
3042
 
 
3043
 
                                xres_cp_wait_for_log_writer(self, db, 400);
3044
 
                                xt_db_approximate_time = time(NULL);
3045
 
                                xt_db_free_unused_open_tables(self, db);
3046
 
                                xt_db_approximate_time = time(NULL);
3047
 
                                if (xt_db_approximate_time > last_index_flush + 2) {
3048
 
                                        last_index_flush = xt_db_approximate_time;
3049
 
                                        xres_flush_indices(self, db);
3050
 
                                }
3051
 
                        }
3052
 
                }
3053
 
        }
3054
 
}
3055
 
 
3056
 
static void *xres_cp_run_thread(XTThreadPtr self)
3057
 
{
3058
 
        XTDatabaseHPtr  db = (XTDatabaseHPtr) self->t_data;
3059
 
        int                             count;
3060
 
        void                    *mysql_thread;
3061
 
 
3062
 
        if (!(mysql_thread = myxt_create_thread()))
3063
 
                xt_throw(self);
3064
 
 
3065
 
        while (!self->t_quit) {
3066
 
                try_(a) {
3067
 
                        /*
3068
 
                         * The garbage collector requires that the database
3069
 
                         * is in use because.
3070
 
                         */
3071
 
                        xt_use_database(self, db, XT_FOR_CHECKPOINTER);
3072
 
 
3073
 
                        /* This action is both safe and required (see {BACKGROUND-RELEASE-DB}) */
3074
 
                        xt_heap_release(self, self->st_database);
3075
 
 
3076
 
                        xres_cp_main(self);
3077
 
                }
3078
 
                catch_(a) {
3079
 
                        /* This error is "normal"! */
3080
 
                        if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
3081
 
                                !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
3082
 
                                self->t_exception.e_sys_err == SIGTERM))
3083
 
                                xt_log_and_clear_exception(self);
3084
 
                }
3085
 
                cont_(a);
3086
 
 
3087
 
                /* Avoid releasing the database (done above) */
3088
 
                self->st_database = NULL;
3089
 
                xt_unuse_database(self, self);
3090
 
 
3091
 
                /* After an exception, pause before trying again... */
3092
 
                /* Number of seconds */
3093
 
                count = 60;
3094
 
                while (!self->t_quit && count > 0) {
3095
 
                        sleep(1);
3096
 
                        count--;
3097
 
                }
3098
 
        }
3099
 
 
3100
 
   /*
3101
 
        * {MYSQL-THREAD-KILL}
3102
 
        myxt_destroy_thread(mysql_thread, TRUE);
3103
 
        */
3104
 
        return NULL;
3105
 
}
3106
 
 
3107
 
static void xres_cp_free_thread(XTThreadPtr self, void *data)
3108
 
{
3109
 
        XTDatabaseHPtr db = (XTDatabaseHPtr) data;
3110
 
 
3111
 
        if (db->db_cp_thread) {
3112
 
                xt_lock_mutex(self, &db->db_cp_lock);
3113
 
                pushr_(xt_unlock_mutex, &db->db_cp_lock);
3114
 
                db->db_cp_thread = NULL;
3115
 
                freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3116
 
        }
3117
 
}
3118
 
 
3119
 
/* Start a checkpoint, if none has been started. */
3120
 
xtPublic xtBool xt_begin_checkpoint(XTDatabaseHPtr db, xtBool have_table_lock, XTThreadPtr thread)
3121
 
{
3122
 
        XTCheckPointStatePtr    cp = &db->db_cp_state;
3123
 
        xtLogID                                 log_id;
3124
 
        xtLogOffset                             log_offset;
3125
 
        xtLogID                                 ind_rec_log_id;
3126
 
        xtLogOffset                             ind_rec_log_offset;
3127
 
        u_int                                   edx;
3128
 
        XTTableEntryPtr                 te_ptr;
3129
 
        XTTableHPtr                             tab;
3130
 
        XTOperationPtr                  op;
3131
 
        XTCheckPointTableRec    cpt;
3132
 
        XTSortedListPtr                 tables = NULL;
3133
 
        
3134
 
        /* during startup we can get an error before the checkpointer is inited */
3135
 
        if (!cp->cp_inited)
3136
 
                return FAILED;
3137
 
 
3138
 
        /* First check if a checkpoint is already running: */
3139
 
        xt_lock_mutex_ns(&cp->cp_state_lock);
3140
 
        if (cp->cp_running) {
3141
 
                xt_unlock_mutex_ns(&cp->cp_state_lock);
3142
 
                return OK;
3143
 
        }
3144
 
        if (cp->cp_table_ids) {
3145
 
                xt_free_sortedlist(NULL, cp->cp_table_ids);
3146
 
                cp->cp_table_ids = NULL;
3147
 
        }
3148
 
        xt_unlock_mutex_ns(&cp->cp_state_lock);
3149
 
        
3150
 
        /* Flush the log before we continue. This is to ensure that
3151
 
         * before we write a checkpoint, that the changes
3152
 
         * done by the sweeper and the compactor, have been
3153
 
         * applied.
3154
 
         *
3155
 
         * Note, the sweeper does not flush the log, so this is
3156
 
         * necessary!
3157
 
         *
3158
 
         * --- I have removed this flush. It is actually just a
3159
 
         * minor optimisation, which pushes the flush position
3160
 
         * below ahead.
3161
 
         *
3162
 
         * Note that the writer position used for the checkpoint
3163
 
         * _will_ be behind the current log flush position.
3164
 
         *
3165
 
         * This is because the writer cannot apply log changes
3166
 
         * until they are flushed.
3167
 
         */
3168
 
        /* This is an alternative to the above.
3169
 
        if (!xt_xlog_flush_log(db, self))
3170
 
                xt_throw(self);
3171
 
        */
3172
 
        xt_lock_mutex_ns(&db->db_wr_lock);
3173
 
 
3174
 
        /* The theoretical maximum restart log postion, is the
3175
 
         * position of the writer thread:
3176
 
         */
3177
 
        log_id = db->db_wr_log_id;
3178
 
        log_offset = db->db_wr_log_offset;
3179
 
 
3180
 
        ind_rec_log_id = db->db_xlog.xl_flush_log_id;
3181
 
        ind_rec_log_offset = db->db_xlog.xl_flush_log_offset;
3182
 
 
3183
 
        xt_unlock_mutex_ns(&db->db_wr_lock);
3184
 
 
3185
 
        /* Go through all the transactions, and find
3186
 
         * the lowest log start position of all the transactions.
3187
 
         */
3188
 
        for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
3189
 
                XTXactSegPtr    seg;
3190
 
 
3191
 
                seg = &db->db_xn_idx[i];
3192
 
                XT_XACT_READ_LOCK(&seg->xs_tab_lock, self);
3193
 
                for (u_int j=0; j<XT_XN_HASH_TABLE_SIZE; j++) {
3194
 
                        XTXactDataPtr   xact;
3195
 
                        
3196
 
                        xact = seg->xs_table[j];
3197
 
                        while (xact) {
3198
 
                                /* If the transaction is logged, but not cleaned: */
3199
 
                                if ((xact->xd_flags & (XT_XN_XAC_LOGGED | XT_XN_XAC_CLEANED)) == XT_XN_XAC_LOGGED) {
3200
 
                                        if (xt_comp_log_pos(log_id, log_offset, xact->xd_begin_log, xact->xd_begin_offset) > 0) {
3201
 
                                                log_id = xact->xd_begin_log;
3202
 
                                                log_offset = xact->xd_begin_offset;
3203
 
                                        }
3204
 
                                }
3205
 
                                xact = xact->xd_next_xact;
3206
 
                        }
3207
 
                }
3208
 
                XT_XACT_UNLOCK(&seg->xs_tab_lock, self, FALSE);
3209
 
        }
3210
 
 
3211
 
#ifdef TRACE_CHECKPOINT
3212
 
        printf("BEGIN CHECKPOINT %d-%llu\n", (int) log_id, (u_llong) log_offset);
3213
 
#endif
3214
 
        /* Go through all tables, and find the lowest log position.
3215
 
         * The log position stored by each table shows the position of
3216
 
         * the next operation that still needs to be applied.
3217
 
         *
3218
 
         * This comes from the list of operations which are
3219
 
         * queued for the table.
3220
 
         *
3221
 
         * This function also builds a list of tables!
3222
 
         */
3223
 
 
3224
 
        if (!(tables = xt_new_sortedlist_ns(sizeof(XTCheckPointTableRec), 20, xres_comp_flush_tabs, NULL, NULL)))
3225
 
                return FAILED;
3226
 
 
3227
 
        xt_enum_tables_init(&edx);
3228
 
        if (!have_table_lock)
3229
 
                xt_ht_lock(NULL, db->db_tables);
3230
 
        while ((te_ptr = xt_enum_tables_next(NULL, db, &edx))) {
3231
 
                if ((tab = te_ptr->te_table)) {
3232
 
                        xt_sl_lock_ns(tab->tab_op_list, thread);
3233
 
                        if ((op = (XTOperationPtr) xt_sl_first_item(tab->tab_op_list))) {
3234
 
                                if (xt_comp_log_pos(log_id, log_offset, op->or_log_id, op->or_log_offset) > 0) {
3235
 
                                        log_id = op->or_log_id;
3236
 
                                        log_offset = op->or_log_offset;
3237
 
                                }
3238
 
                        }
3239
 
                        xt_sl_unlock(NULL, tab->tab_op_list);
3240
 
                        cpt.cpt_flushed = 0;
3241
 
                        cpt.cpt_tab_id = tab->tab_id;
3242
 
#ifdef TRACE_CHECKPOINT
3243
 
                        printf("CHECKPOINT will flush: %d %s\n", (int) tab->tab_id, tab->tab_name->ps_path);
3244
 
#endif
3245
 
                        if (!xt_sl_insert(NULL, tables, &tab->tab_id, &cpt)) {
3246
 
                                if (!have_table_lock)
3247
 
                                        xt_ht_unlock(NULL, db->db_tables);
3248
 
                                xt_free_sortedlist(NULL, tables);
3249
 
                                return FAILED;
3250
 
                        }
3251
 
                }
3252
 
        }
3253
 
        if (!have_table_lock)
3254
 
                xt_ht_unlock(NULL, db->db_tables);
3255
 
 
3256
 
        xt_lock_mutex_ns(&cp->cp_state_lock);
3257
 
        /* If there is a table list, then someone was faster than me! */
3258
 
        /* NOTE: log_offset can be zero, and valid!
3259
 
         * So we only check the log_id.
3260
 
         */
3261
 
        if (!cp->cp_running && log_id) {
3262
 
                cp->cp_running = TRUE;
3263
 
                cp->cp_log_id = log_id;
3264
 
                cp->cp_log_offset = log_offset;
3265
 
 
3266
 
                cp->cp_ind_rec_log_id = ind_rec_log_id;
3267
 
                cp->cp_ind_rec_log_offset = ind_rec_log_offset;
3268
 
 
3269
 
                cp->cp_flush_count = 0;
3270
 
                cp->cp_next_to_flush = 0;
3271
 
                cp->cp_table_ids = tables;
3272
 
        }
3273
 
        else
3274
 
                xt_free_sortedlist(NULL, tables);
3275
 
        xt_unlock_mutex_ns(&cp->cp_state_lock);
3276
 
 
3277
 
        /* At this point, log flushing can begin... */
3278
 
        return OK;
3279
 
}
3280
 
 
3281
 
/*
3282
 
 * This function returns TRUE if there is still something to flush according to the current
3283
 
 * checkpoint.
3284
 
 *
3285
 
 * If so, it returns the table ID and a bit indicating whether the table data, or the
3286
 
 * index should be flushed.
3287
 
 *
3288
 
 * Note, the function also checks if an asynchronous flush is in progress. It will not return
3289
 
 * a reference to a flush that is already in progress.
3290
 
 */
3291
 
static xtBool xres_get_next_to_flush(XTDatabaseHPtr db, xtTableID *tab_id, int *flush_bit, xtBool skip_busy, xtBool rec_first)
3292
 
{
3293
 
        XTCheckPointStatePtr    cp = &db->db_cp_state;
3294
 
        XTCheckPointTablePtr    cp_tab;
3295
 
        size_t                                  table_count, tab_idx;
3296
 
        xtBool                                  flush_index;
3297
 
 
3298
 
        xt_lock_mutex_ns(&cp->cp_state_lock);
3299
 
        if (!cp->cp_running || !cp->cp_table_ids)
3300
 
                goto flush_complete;
3301
 
 
3302
 
        if (!(table_count = xt_sl_get_size(cp->cp_table_ids)))
3303
 
                goto flush_complete;
3304
 
 
3305
 
        ASSERT_NS(cp->cp_flush_count <= table_count);
3306
 
        if (cp->cp_flush_count >= table_count)
3307
 
                goto flush_complete;
3308
 
        
3309
 
        /* Try all tables: */
3310
 
        for (size_t i=0; i<table_count; i++) {
3311
 
                if (cp->cp_next_to_flush > table_count*2) {
3312
 
                        cp->cp_next_to_flush = 0;
3313
 
                        if (!skip_busy) {
3314
 
                                /* Output a dummy no table, to indicate we are wrapping! */
3315
 
                                *tab_id = 0;
3316
 
                                goto flush_table;
3317
 
                        }
3318
 
                }
3319
 
 
3320
 
                if (rec_first) {
3321
 
                        /* First all record/row files are flushed, then
3322
 
                         * the index files.
3323
 
                         */
3324
 
                        if (cp->cp_next_to_flush < table_count) {
3325
 
                                tab_idx = cp->cp_next_to_flush;
3326
 
                                flush_index = FALSE; 
3327
 
                        }
3328
 
                        else {
3329
 
                                tab_idx = cp->cp_next_to_flush - table_count;
3330
 
                                flush_index = TRUE; 
3331
 
                        }
3332
 
                }
3333
 
                else {
3334
 
                        /* Flush each table, one after the other,
3335
 
                         * record/row file followed by index file:
3336
 
                         */
3337
 
                        tab_idx = cp->cp_next_to_flush / 2;
3338
 
                        flush_index = (cp->cp_next_to_flush % 2) != 0;
3339
 
                }
3340
 
 
3341
 
                cp->cp_next_to_flush++;
3342
 
 
3343
 
                if ((cp_tab = (XTCheckPointTablePtr) xt_sl_item_at(cp->cp_table_ids, tab_idx))) {
3344
 
                        *tab_id = cp_tab->cpt_tab_id;
3345
 
                        if (flush_index) {
3346
 
                                if (!(cp_tab->cpt_flushed & XT_CPT_INDEX_FLUSHED)) {
3347
 
                                        if (!skip_busy || !(cp_tab->cpt_flushed & XT_CPT_INDEX_FLUSHING)) {
3348
 
                                                *flush_bit = XT_CPT_INDEX_FLUSHED;
3349
 
                                                goto flush_table;
3350
 
                                        }
3351
 
                                }
3352
 
                        }
3353
 
                        else {
3354
 
                                if (!(cp_tab->cpt_flushed & XT_CPT_REC_ROW_FLUSHED)) {
3355
 
                                        if (!skip_busy || !(cp_tab->cpt_flushed & XT_CPT_REC_ROW_FLUSHING)) {
3356
 
                                                *flush_bit = XT_CPT_REC_ROW_FLUSHED;
3357
 
                                                goto flush_table;
3358
 
                                        }
3359
 
                                }
3360
 
                        }
3361
 
                }
3362
 
        }
3363
 
 
3364
 
        flush_complete:
3365
 
        xt_unlock_mutex_ns(&cp->cp_state_lock);
3366
 
        return FALSE;
3367
 
 
3368
 
        flush_table:
3369
 
        xt_unlock_mutex_ns(&cp->cp_state_lock);
3370
 
        return TRUE;
3371
 
}
3372
 
 
3373
 
xtPublic void xt_checkpoint_set_flush_state(XTDatabaseHPtr db, xtTableID tab_id, int state)
3374
 
{
3375
 
        XTCheckPointStatePtr    cp = &db->db_cp_state;
3376
 
        XTCheckPointTablePtr    cp_tab;
3377
 
        int                                             flush_bit = 0;
3378
 
 
3379
 
        xt_lock_mutex_ns(&cp->cp_state_lock);
3380
 
        if (cp->cp_running) {
3381
 
                cp_tab = (XTCheckPointTablePtr) xt_sl_find(NULL, cp->cp_table_ids, &tab_id);
3382
 
                if (cp_tab) {
3383
 
                        switch (state) {
3384
 
                                case XT_CPT_STATE_START_REC_ROW:
3385
 
                                        cp_tab->cpt_flushed |= XT_CPT_REC_ROW_FLUSHING;
3386
 
                                        break;
3387
 
                                case XT_CPT_STATE_STOP_REC_ROW:
3388
 
                                        cp_tab->cpt_flushed &= ~XT_CPT_REC_ROW_FLUSHING;
3389
 
                                        break;
3390
 
                                case XT_CPT_STATE_DONE_REC_ROW:
3391
 
                                        cp_tab->cpt_flushed &= ~XT_CPT_REC_ROW_FLUSHING;
3392
 
                                        flush_bit = XT_CPT_REC_ROW_FLUSHED;
3393
 
                                        break;
3394
 
                                case XT_CPT_STATE_START_INDEX:
3395
 
                                        cp_tab->cpt_flushed |= XT_CPT_INDEX_FLUSHING;
3396
 
                                        break;
3397
 
                                case XT_CPT_STATE_STOP_INDEX:
3398
 
                                        cp_tab->cpt_flushed &= ~XT_CPT_INDEX_FLUSHING;
3399
 
                                        break;
3400
 
                                case XT_CPT_STATE_DONE_INDEX:
3401
 
                                        cp_tab->cpt_flushed &= ~XT_CPT_INDEX_FLUSHING;
3402
 
                                        flush_bit = XT_CPT_INDEX_FLUSHED;
3403
 
                                        break;
3404
 
                                case XT_CPT_STATE_DONE_ALL:
3405
 
                                        cp_tab->cpt_flushed &= ~(XT_CPT_REC_ROW_FLUSHING | XT_CPT_INDEX_FLUSHING);
3406
 
                                        flush_bit = XT_CPT_REC_ROW_FLUSHED | XT_CPT_INDEX_FLUSHED;
3407
 
                                        break;
3408
 
                        }
3409
 
 
3410
 
                        if (flush_bit && ((cp_tab->cpt_flushed & XT_CPT_ALL_FLUSHED) != XT_CPT_ALL_FLUSHED)) {
3411
 
                                cp_tab->cpt_flushed |= flush_bit;
3412
 
                                if ((cp_tab->cpt_flushed & XT_CPT_ALL_FLUSHED) == XT_CPT_ALL_FLUSHED) {
3413
 
                                        ASSERT_NS(cp->cp_flush_count < xt_sl_get_size(cp->cp_table_ids));
3414
 
                                        cp->cp_flush_count++;
3415
 
                                }
3416
 
                        }
3417
 
                }
3418
 
        }
3419
 
        xt_unlock_mutex_ns(&cp->cp_state_lock);
3420
 
}
3421
 
 
3422
 
/* End a checkpoint, if a checkpoint has been started,
3423
 
 * and all checkpoint tables have been flushed
3424
 
 */
3425
 
xtPublic xtBool xt_end_checkpoint(XTDatabaseHPtr db, XTThreadPtr thread, xtBool *checkpoint_done)
3426
 
{
3427
 
        XTCheckPointStatePtr    cp = &db->db_cp_state;
3428
 
        XTXlogCheckpointDPtr    cp_buf = NULL;
3429
 
        char                                    path[PATH_MAX];
3430
 
        XTOpenFilePtr                   of;
3431
 
        u_int                                   table_count;
3432
 
        size_t                                  chk_size = 0; 
3433
 
        u_int                                   no_of_logs = 0; 
3434
 
 
3435
 
        /* As long as we have outstanding XA transactions, we may not checkpoint! */
3436
 
        if (xt_sl_get_size(db->db_xn_xa_list) > 0) {
3437
 
#ifdef DEBUG
3438
 
                printf("Checkpoint must wait\n");
3439
 
#endif
3440
 
                return OK;
3441
 
        }
3442
 
 
3443
 
#ifdef NEVER_CHECKPOINT
3444
 
        return OK;
3445
 
#endif
3446
 
        /* Lock the checkpoint state so that only on thread can do this! */
3447
 
        xt_lock_mutex_ns(&cp->cp_state_lock);
3448
 
        if (!cp->cp_running)
3449
 
                goto checkpoint_complete;
3450
 
 
3451
 
        table_count = 0;
3452
 
        if (cp->cp_table_ids)
3453
 
                table_count = xt_sl_get_size(cp->cp_table_ids);
3454
 
        if (cp->cp_flush_count < table_count) {
3455
 
                /* Checkpoint is not done, yet! */
3456
 
                xt_unlock_mutex_ns(&cp->cp_state_lock);
3457
 
                if (checkpoint_done)
3458
 
                        *checkpoint_done = FALSE;
3459
 
                return OK;
3460
 
        }
3461
 
 
3462
 
        /* Check if anything has changed since the last checkpoint,
3463
 
         * if not, there is no need to write a new checkpoint!
3464
 
         */
3465
 
        if (xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
3466
 
                xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
3467
 
                xt_comp_log_pos(cp->cp_log_id, cp->cp_log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0) {
3468
 
                /* A checkpoint is required if the size of the deleted
3469
 
                 * list is not zero. The reason is, I cannot remove the
3470
 
                 * logs from the deleted list BEFORE a checkpoint has been
3471
 
                 * done which does NOT include these logs.
3472
 
                 *
3473
 
                 * Even though the logs have already been deleted. They
3474
 
                 * remain on the deleted list to ensure that they are NOT
3475
 
                 * reused during this time, until the next checkpoint.
3476
 
                 *
3477
 
                 * This is done because if they are used, then on restart
3478
 
                 * they would be deleted!
3479
 
                 */
3480
 
#ifdef TRACE_CHECKPOINT
3481
 
                printf("--- END CHECKPOINT - no write\n");
3482
 
#endif
3483
 
                goto checkpoint_complete;
3484
 
        }
3485
 
 
3486
 
#ifdef TRACE_CHECKPOINT
3487
 
        printf("--- END CHECKPOINT - write start point\n");
3488
 
#endif
3489
 
        xt_lock_mutex_ns(&db->db_datalogs.dlc_lock);
3490
 
 
3491
 
        no_of_logs = xt_sl_get_size(db->db_datalogs.dlc_to_delete);
3492
 
        chk_size = offsetof(XTXlogCheckpointDRec, xcp_del_log) + no_of_logs * 2;
3493
 
        xtLogID *log_id_ptr;
3494
 
 
3495
 
        if (!(cp_buf = (XTXlogCheckpointDPtr) xt_malloc_ns(chk_size))) {
3496
 
                xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
3497
 
                goto failed_0;
3498
 
        }
3499
 
 
3500
 
        /* Increment the checkpoint number. This value is used if 2 checkpoint have the
3501
 
         * same log number. In this case checkpoints may differ in the log files
3502
 
         * that should be deleted. Here it is important to use the most recent
3503
 
         * log file!
3504
 
         */
3505
 
        db->db_restart.xres_cp_number++;
3506
 
 
3507
 
        /* Create the checkpoint record: */
3508
 
        XT_SET_DISK_4(cp_buf->xcp_head_size_4, chk_size);
3509
 
        XT_SET_DISK_2(cp_buf->xcp_version_2, XT_CHECKPOINT_VERSION);
3510
 
        XT_SET_DISK_6(cp_buf->xcp_chkpnt_no_6, db->db_restart.xres_cp_number);
3511
 
        XT_SET_DISK_4(cp_buf->xcp_log_id_4, cp->cp_log_id);
3512
 
        XT_SET_DISK_6(cp_buf->xcp_log_offs_6, cp->cp_log_offset);
3513
 
        XT_SET_DISK_4(cp_buf->xcp_tab_id_4, db->db_curr_tab_id);
3514
 
        XT_SET_DISK_4(cp_buf->xcp_xact_id_4, db->db_xn_curr_id);
3515
 
        XT_SET_DISK_4(cp_buf->xcp_ind_rec_log_id_4, cp->cp_ind_rec_log_id);
3516
 
        XT_SET_DISK_6(cp_buf->xcp_ind_rec_log_offs_6, cp->cp_ind_rec_log_offset);
3517
 
        XT_SET_DISK_2(cp_buf->xcp_log_count_2, no_of_logs);
3518
 
 
3519
 
        for (u_int i=0; i<no_of_logs; i++) {
3520
 
                log_id_ptr = (xtLogID *) xt_sl_item_at(db->db_datalogs.dlc_to_delete, i);
3521
 
                XT_SET_DISK_2(cp_buf->xcp_del_log[i], (xtWord2) *log_id_ptr);
3522
 
        }
3523
 
 
3524
 
        XT_SET_DISK_2(cp_buf->xcp_checksum_2, xt_get_checksum(((xtWord1 *) cp_buf) + 2, chk_size - 2, 1));
3525
 
 
3526
 
        xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
3527
 
 
3528
 
        /* Write the checkpoint: */
3529
 
        db->db_restart.xres_name(PATH_MAX, path, db->db_restart.xres_next_res_no);
3530
 
        if (!(of = xt_open_file_ns(path, XT_FT_STANDARD, XT_FS_CREATE | XT_FS_MAKE_PATH, 16*1024*1024)))
3531
 
                goto failed_1;
3532
 
 
3533
 
        if (!xt_set_eof_file(NULL, of, 0))
3534
 
                goto failed_2;
3535
 
        if (!xt_pwrite_file(of, 0, chk_size, (xtWord1 *) cp_buf, &thread->st_statistics.st_x, thread))
3536
 
                goto failed_2;
3537
 
        if (!xt_flush_file(of, &thread->st_statistics.st_x, thread))
3538
 
                goto failed_2;
3539
 
 
3540
 
        xt_close_file_ns(of);
3541
 
 
3542
 
        /* Next time write the other restart file: */
3543
 
        db->db_restart.xres_next_res_no = (db->db_restart.xres_next_res_no % 2) + 1;
3544
 
        db->db_restart.xres_cp_log_id = cp->cp_log_id;
3545
 
        db->db_restart.xres_cp_log_offset = cp->cp_log_offset;
3546
 
        db->db_restart.xres_cp_required = FALSE;
3547
 
 
3548
 
        /*
3549
 
         * Remove all the data logs that were deleted on the
3550
 
         * last checkpoint:
3551
 
         */
3552
 
        if (!xres_remove_data_logs(db))
3553
 
                goto failed_0;
3554
 
 
3555
 
#ifndef DEBUG_KEEP_LOGS
3556
 
        /* After checkpoint, we can delete transaction logs that will no longer be required
3557
 
         * for recovery...
3558
 
         */
3559
 
        if (cp->cp_log_id > 1) {
3560
 
                xtLogID current_log_id = cp->cp_log_id;
3561
 
                xtLogID del_log_id;
3562
 
 
3563
 
#ifdef XT_NUMBER_OF_LOGS_TO_SAVE
3564
 
                if (pbxt_crash_debug) {
3565
 
                        /* To save the logs, we just consider them in use: */
3566
 
                        if (current_log_id > XT_NUMBER_OF_LOGS_TO_SAVE)
3567
 
                                current_log_id -= XT_NUMBER_OF_LOGS_TO_SAVE;
3568
 
                        else
3569
 
                                current_log_id = 1;
3570
 
                }
3571
 
#endif
3572
 
 
3573
 
                del_log_id = current_log_id - 1;
3574
 
 
3575
 
                while (del_log_id > 0) {
3576
 
                        db->db_xlog.xlog_name(PATH_MAX, path, del_log_id);
3577
 
                        if (!xt_fs_exists(path))
3578
 
                                break;
3579
 
                        del_log_id--;
3580
 
                }
3581
 
 
3582
 
                /* This was the lowest log ID that existed: */
3583
 
                del_log_id++;
3584
 
 
3585
 
                /* Delete all logs that still exist, that come before
3586
 
                 * the current log:
3587
 
                 *
3588
 
                 * Do this from least to greatest to ensure no "holes" appear.
3589
 
                 */
3590
 
                while (del_log_id < current_log_id) {
3591
 
                        switch (db->db_xlog.xlog_delete_log(del_log_id, thread)) {
3592
 
                                case OK:
3593
 
                                        break;
3594
 
                                case FAILED:
3595
 
                                        goto exit_loop;
3596
 
                                case XT_ERR:
3597
 
                                        goto failed_0;
3598
 
                        }
3599
 
                        del_log_id++;
3600
 
                }
3601
 
                exit_loop:;
3602
 
        }
3603
 
 
3604
 
        /* And we can delete data logs in the list, and place them
3605
 
         * on the deleted list.
3606
 
         */
3607
 
        xtLogID log_id;
3608
 
        for (u_int i=0; i<no_of_logs; i++) {
3609
 
                log_id = (xtLogID) XT_GET_DISK_2(cp_buf->xcp_del_log[i]);
3610
 
                if (!xres_delete_data_log(db, log_id))
3611
 
                        goto failed_0;
3612
 
        }
3613
 
#endif
3614
 
 
3615
 
        xt_free_ns(cp_buf);
3616
 
        cp_buf = NULL;
3617
 
 
3618
 
        checkpoint_complete:
3619
 
        cp->cp_running = FALSE;
3620
 
        if (cp->cp_table_ids) {
3621
 
                xt_free_sortedlist(NULL, cp->cp_table_ids);
3622
 
                cp->cp_table_ids = NULL;
3623
 
        }
3624
 
        cp->cp_flush_count = 0;
3625
 
        cp->cp_next_to_flush = 0;
3626
 
        db->db_restart.xres_cp_required = FALSE;
3627
 
        xt_unlock_mutex_ns(&cp->cp_state_lock);
3628
 
        if (checkpoint_done)
3629
 
                *checkpoint_done = TRUE;
3630
 
        return OK;
3631
 
 
3632
 
        failed_2:
3633
 
        xt_close_file_ns(of);
3634
 
 
3635
 
        failed_1:
3636
 
        xt_free_ns(cp_buf);
3637
 
 
3638
 
        failed_0:
3639
 
        if (cp_buf)
3640
 
                xt_free_ns(cp_buf);
3641
 
        xt_unlock_mutex_ns(&cp->cp_state_lock);
3642
 
        return FAILED;
3643
 
}
3644
 
 
3645
 
xtPublic xtWord8 xt_bytes_since_last_checkpoint(XTDatabaseHPtr db, xtLogID curr_log_id, xtLogOffset curr_log_offset)
3646
 
{
3647
 
        xtLogID                                 log_id;
3648
 
        xtLogOffset                             log_offset;
3649
 
        size_t                                  byte_count = 0;
3650
 
 
3651
 
        log_id = db->db_restart.xres_cp_log_id;
3652
 
        log_offset = db->db_restart.xres_cp_log_offset;
3653
 
 
3654
 
        /* Assume the logs have the threshold: */
3655
 
        if (log_id < curr_log_id) {
3656
 
                if (log_offset < xt_db_log_file_threshold)
3657
 
                        byte_count = (size_t) (xt_db_log_file_threshold - log_offset);
3658
 
                log_offset = 0;
3659
 
                log_id++;
3660
 
        }
3661
 
        while (log_id < curr_log_id) {
3662
 
                byte_count += (size_t) xt_db_log_file_threshold;
3663
 
                log_id++;
3664
 
        }
3665
 
        if (log_offset < curr_log_offset)
3666
 
                byte_count += (size_t) (curr_log_offset - log_offset);
3667
 
 
3668
 
        return byte_count;
3669
 
}
3670
 
 
3671
 
xtPublic void xt_start_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3672
 
{
3673
 
        char name[PATH_MAX];
3674
 
 
3675
 
        sprintf(name, "CP-%s", xt_last_directory_of_path(db->db_main_path));
3676
 
        xt_remove_dir_char(name);
3677
 
        db->db_cp_thread = xt_create_daemon(self, name);
3678
 
        xt_set_thread_data(db->db_cp_thread, db, xres_cp_free_thread);
3679
 
        xt_run_thread(self, db->db_cp_thread, xres_cp_run_thread);
3680
 
}
3681
 
 
3682
 
xtPublic void xt_wait_for_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3683
 
{
3684
 
        time_t          then, now;
3685
 
        xtBool          message = FALSE;
3686
 
        xtLogID         log_id;
3687
 
        xtLogOffset     log_offset;
3688
 
 
3689
 
        if (db->db_cp_thread) {
3690
 
                then = time(NULL);
3691
 
                for (;;) {
3692
 
                        xt_lock_mutex(self, &db->db_wr_lock);
3693
 
                        pushr_(xt_unlock_mutex, &db->db_wr_lock);
3694
 
                        log_id = db->db_wr_log_id;
3695
 
                        log_offset = db->db_wr_log_offset;
3696
 
                        freer_(); // xt_unlock_mutex(&db->db_wr_lock)
3697
 
 
3698
 
                        if (xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
3699
 
                                xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
3700
 
                                xt_comp_log_pos(log_id, log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0)
3701
 
                                break;
3702
 
 
3703
 
                        /* Do a final checkpoint before shutdown: */
3704
 
                        db->db_restart.xres_cp_required = TRUE;
3705
 
 
3706
 
                        xt_lock_mutex(self, &db->db_cp_lock);
3707
 
                        pushr_(xt_unlock_mutex, &db->db_cp_lock);
3708
 
                        if (!xt_broadcast_cond_ns(&db->db_cp_cond)) {
3709
 
                                xt_log_and_clear_exception_ns();
3710
 
                                break;
3711
 
                        }
3712
 
                        freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3713
 
 
3714
 
                        xt_sleep_milli_second(10);
3715
 
 
3716
 
                        now = time(NULL);
3717
 
                        if (now >= then + 16) {
3718
 
                                xt_logf(XT_NT_INFO, "Aborting wait for '%s' checkpointer\n", db->db_name);
3719
 
                                message = FALSE;
3720
 
                                break;
3721
 
                        }
3722
 
                        if (now >= then + 2) {
3723
 
                                if (!message) {
3724
 
                                        message = TRUE;
3725
 
                                        xt_logf(XT_NT_INFO, "Waiting for '%s' checkpointer...\n", db->db_name);
3726
 
                                }
3727
 
                        }
3728
 
                }
3729
 
 
3730
 
                if (message)
3731
 
                        xt_logf(XT_NT_INFO, "Checkpointer '%s' done.\n", db->db_name);
3732
 
        }
3733
 
}
3734
 
 
3735
 
xtPublic void xt_stop_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3736
 
{
3737
 
        XTThreadPtr thr_wr;
3738
 
 
3739
 
        if (db->db_cp_thread) {
3740
 
                xt_lock_mutex(self, &db->db_cp_lock);
3741
 
                pushr_(xt_unlock_mutex, &db->db_cp_lock);
3742
 
 
3743
 
                /* This pointer is safe as long as you have the transaction lock. */
3744
 
                if ((thr_wr = db->db_cp_thread)) {
3745
 
                        xtThreadID tid = thr_wr->t_id;
3746
 
 
3747
 
                        /* Make sure the thread quits when woken up. */
3748
 
                        xt_terminate_thread(self, thr_wr);
3749
 
 
3750
 
                        xt_wake_checkpointer(self, db);
3751
 
 
3752
 
                        freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3753
 
 
3754
 
                        /*
3755
 
                         * GOTCHA: This is a wierd thing but the SIGTERM directed
3756
 
                         * at a particular thread (in this case the sweeper) was
3757
 
                         * being caught by a different thread and killing the server
3758
 
                         * sometimes. Disconcerting.
3759
 
                         * (this may only be a problem on Mac OS X)
3760
 
                        xt_kill_thread(thread);
3761
 
                         */
3762
 
                        xt_wait_for_thread_to_exit(tid, FALSE);
3763
 
 
3764
 
                        /* PMC - This should not be necessary to set the signal here, but in the
3765
 
                         * debugger the handler is not called!!?
3766
 
                        thr_wr->t_delayed_signal = SIGTERM;
3767
 
                        xt_kill_thread(thread);
3768
 
                         */
3769
 
                        db->db_cp_thread = NULL;
3770
 
                }
3771
 
                else
3772
 
                        freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3773
 
        }
3774
 
}
3775
 
 
3776
 
xtPublic void xt_wake_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3777
 
{
3778
 
        if (!xt_broadcast_cond_ns(&db->db_cp_cond))
3779
 
                xt_log_and_clear_exception(self);
3780
 
}
3781
 
 
3782
 
xtPublic void xt_free_writer_state(struct XTThread *self, XTWriterStatePtr ws)
3783
 
{
3784
 
        if (ws->ws_db)
3785
 
                ws->ws_db->db_xlog.xlog_seq_exit(&ws->ws_seqread);
3786
 
        xt_db_set_size(self, &ws->ws_databuf, 0);
3787
 
        xt_ib_free(self, &ws->ws_rec_buf);
3788
 
        if (ws->ws_ot) {
3789
 
                xt_db_return_table_to_pool(self, ws->ws_ot);
3790
 
                ws->ws_ot = NULL;
3791
 
        }
3792
 
}
3793
 
 
3794
 
xtPublic void xt_dump_xlogs(XTDatabaseHPtr db, xtLogID start_log)
3795
 
{
3796
 
        XTXactSeqReadRec        seq;
3797
 
        XTXactLogBufferDPtr     record;
3798
 
        xtLogID                         log_id = db->db_restart.xres_cp_log_id;
3799
 
        char                            log_path[PATH_MAX];
3800
 
        XTThreadPtr                     thread = xt_get_self();
3801
 
 
3802
 
        /* Find the first log that still exists:*/
3803
 
        for (;;) {
3804
 
                log_id--;
3805
 
                db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
3806
 
                if (!xt_fs_exists(log_path))
3807
 
                        break;
3808
 
        }
3809
 
        log_id++;
3810
 
 
3811
 
        if (!db->db_xlog.xlog_seq_init(&seq, xt_db_log_buffer_size, FALSE))
3812
 
                return;
3813
 
 
3814
 
        if (log_id < start_log)
3815
 
                log_id = start_log;
3816
 
 
3817
 
        for (;;) {
3818
 
                db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
3819
 
                if (!xt_fs_exists(log_path))
3820
 
                        break;
3821
 
 
3822
 
                if (!db->db_xlog.xlog_seq_start(&seq, log_id, 0, FALSE))
3823
 
                        goto done;
3824
 
 
3825
 
                PRINTF("---------- DUMP LOG %d\n", (int) log_id);
3826
 
                for (;;) {
3827
 
                        if (!db->db_xlog.xlog_seq_next(&seq, &record, TRUE, thread)) {
3828
 
                                PRINTF("---------- DUMP LOG %d ERROR\n", (int) log_id);
3829
 
                                xt_log_and_clear_exception_ns();
3830
 
                                break;
3831
 
                        }
3832
 
                        if (!record) {
3833
 
                                PRINTF("---------- DUMP LOG %d DONE\n", (int) log_id);
3834
 
                                break;
3835
 
                        }
3836
 
                        xt_print_log_record(seq.xseq_rec_log_id, seq.xseq_rec_log_offset, record);
3837
 
                }
3838
 
 
3839
 
                log_id++;
3840
 
        }
3841
 
 
3842
 
        done:
3843
 
        db->db_xlog.xlog_seq_exit(&seq);
3844
 
}
3845
 
 
3846
 
/* ----------------------------------------------------------------------
3847
 
 * D A T A B A S E   R E C O V E R Y   T H R E A D
3848
 
 */
3849
 
 
3850
 
 
3851
 
static XTThreadPtr              xres_recovery_thread;
3852
 
 
3853
 
static void *xn_xres_run_recovery_thread(XTThreadPtr self)
3854
 
{
3855
 
        THD *mysql_thread;
3856
 
 
3857
 
        if (!(mysql_thread = (THD *) myxt_create_thread()))
3858
 
                xt_throw(self);
3859
 
 
3860
 
//#ifdef DRIZZLED
3861
 
//      static const std::string plugin_name("PBXT");
3862
 
//
3863
 
//      while (!xres_recovery_thread->t_quit && !Registry::singleton().find(plugin_name))
3864
 
//              xt_sleep_milli_second(1);
3865
 
//#else
3866
 
//      while (!xres_recovery_thread->t_quit && !ha_resolve_by_legacy_type(mysql_thread, DB_TYPE_PBXT))
3867
 
//              xt_sleep_milli_second(1);
3868
 
//#endif
3869
 
        myxt_wait_pbxt_plugin_slot_assigned(self);
3870
 
 
3871
 
        if (!xres_recovery_thread->t_quit) {
3872
 
                try_(a) {
3873
 
                        XTDatabaseHPtr  db;
3874
 
 
3875
 
                        /* {GLOBAL-DB}
3876
 
                         * It can happen that something will just get in before this
3877
 
                         * thread and open/recover the database!
3878
 
                         */
3879
 
                        if (!pbxt_database) {
3880
 
                                xt_open_database(self, mysql_real_data_home, TRUE);
3881
 
                                /* {GLOBAL-DB}
3882
 
                                 * This can be done at the same time as the recovery thread,
3883
 
                                 * strictly speaking I need a lock.
3884
 
                                 */
3885
 
                                if (!pbxt_database) {
3886
 
                                        pbxt_database = self->st_database;
3887
 
                                        xt_heap_reference(self, pbxt_database);
3888
 
                                }
3889
 
                        }
3890
 
                        else
3891
 
                                xt_use_database(self, pbxt_database, XT_FOR_USER);
3892
 
 
3893
 
                        pbxt_recovery_state = XT_RECOVER_DONE;
3894
 
 
3895
 
                        /* {WAIT-FOR-SW-AFTER-RECOV}
3896
 
                         * Moved to here...
3897
 
                         */
3898
 
                        db = self->st_database;
3899
 
                        xt_lock_mutex(self, &db->db_init_sweep_lock);
3900
 
                        pushr_(xt_unlock_mutex, &db->db_init_sweep_lock);
3901
 
                        if (!db->db_init_sweep_done) {
3902
 
                                XTTableEntryPtr         te_ptr;
3903
 
                                XTTableHPtr                     tab;
3904
 
                                
3905
 
                                xt_init_sweeper_wait(self, db);
3906
 
                                
3907
 
                                /* Memory tables are not recovered. This loop will
3908
 
                                 * make sure that the can be used, even though they are not recovered!
3909
 
                                 */
3910
 
                                for (u_int i=0; i<xt_sl_get_size(db->db_table_by_id); i++) {
3911
 
                                        te_ptr = (XTTableEntryPtr) xt_sl_item_at(db->db_table_by_id, i);
3912
 
                                        if ((tab = te_ptr->te_table)) {
3913
 
                                                if (tab->tab_dic.dic_tab_flags & XT_TF_MEMORY_TABLE) {
3914
 
                                                        tab->tab_recovery_not_done = FALSE;
3915
 
                                                        tab->tab_dic.dic_disable_index = XT_INDEX_OK;
3916
 
                                                }
3917
 
                                        }
3918
 
                                }
3919
 
                                
3920
 
                                db->db_init_sweep_done = TRUE;
3921
 
                        }
3922
 
                        freer_(); // xt_unlock_mutex(&db->db_init_sweep_lock)
3923
 
 
3924
 
                        pbxt_recovery_state = XT_RECOVER_SWEPT;
3925
 
                }
3926
 
                catch_(a) {
3927
 
                        pbxt_recovery_state = XT_RECOVER_ERROR;
3928
 
                        xt_log_and_clear_exception(self);
3929
 
                }
3930
 
                cont_(a);
3931
 
        }
3932
 
 
3933
 
   /*
3934
 
    * {MYSQL-THREAD-KILL}
3935
 
        * Here is the problem with destroying the thread at this
3936
 
        * point. If we had an error started, then it can lead
3937
 
        * to a callback into pbxt: pbxt_panic().
3938
 
        *
3939
 
        * This will shutdown things, making it impossible quite the
3940
 
        * thread and do a cleanup. Solution:
3941
 
        *
3942
 
        * Move the MySQL thread descruction to a later point!
3943
 
        *
3944
 
        * sql/mysqld --no-defaults --basedir=~/maria/trunk 
3945
 
        * --character-sets-dir=~/maria/trunk/sql/share/charsets 
3946
 
        * --language=~/maria/trunk/sql/share/english 
3947
 
        * --skip-networking --datadir=/tmp/x --skip-grant-tables --nonexistentoption 
3948
 
        *
3949
 
        * #0    0x003893f9 in xt_exit_databases at database_xt.cc:304
3950
 
        * #1    0x0039dc7e in pbxt_end at ha_pbxt.cc:947
3951
 
        * #2    0x0039dd27 in pbxt_panic at ha_pbxt.cc:1289
3952
 
        * #3    0x001d619e in ha_finalize_handlerton at handler.cc:391
3953
 
        * #4    0x00279d22 in plugin_deinitialize at sql_plugin.cc:816
3954
 
        * #5    0x0027bcf5 in reap_plugins at sql_plugin.cc:904
3955
 
        * #6    0x0027c38c in plugin_thdvar_cleanup at sql_plugin.cc:2513
3956
 
        * #7    0x000c0db2 in THD::~THD at sql_class.cc:934
3957
 
        * #8    0x003b025b in myxt_destroy_thread at myxt_xt.cc:2999
3958
 
        * #9    0x003b66b5 in xn_xres_run_recovery_thread at restart_xt.cc:3196
3959
 
        * #10   0x003cbfbb in xt_thread_main at thread_xt.cc:1020
3960
 
        *
3961
 
        myxt_destroy_thread(mysql_thread, TRUE);
3962
 
        */
3963
 
 
3964
 
        xres_recovery_thread = NULL;
3965
 
        return NULL;
3966
 
}
3967
 
 
3968
 
xtPublic void xt_xres_start_database_recovery(XTThreadPtr self)
3969
 
{
3970
 
        char name[PATH_MAX];
3971
 
 
3972
 
        sprintf(name, "DB-RECOVERY-%s", xt_last_directory_of_path(mysql_real_data_home));
3973
 
        xt_remove_dir_char(name);
3974
 
 
3975
 
        pbxt_recovery_state = XT_RECOVER_PENDING;
3976
 
        xres_recovery_thread = xt_create_daemon(self, name);
3977
 
        xt_run_thread(self, xres_recovery_thread, xn_xres_run_recovery_thread);
3978
 
}
3979
 
 
3980
 
xtPublic void xt_xres_terminate_recovery(XTThreadPtr self)
3981
 
{
3982
 
        XTThreadPtr thr_rec;
3983
 
 
3984
 
        /* {MYSQL-THREAD-KILL}
3985
 
         * Stack above shows that his is possible!
3986
 
         */
3987
 
        if ((thr_rec = xres_recovery_thread) && (self != xres_recovery_thread)) {
3988
 
                xtThreadID tid = thr_rec->t_id;
3989
 
 
3990
 
                xt_terminate_thread(self, thr_rec);
3991
 
 
3992
 
                xt_wait_for_thread_to_exit(tid, TRUE);
3993
 
        }
3994
 
}
3995
 
 
3996
 
/* ----------------------------------------------------------------------
3997
 
 * L O G   F L U S H    P R O C E S S
3998
 
 */
3999
 
 
4000
 
static void *xres_fl_run_thread(XTThreadPtr self)
4001
 
{
4002
 
        XTDatabaseHPtr  db = (XTDatabaseHPtr) self->t_data;
4003
 
        int                             count;
4004
 
        void                    *mysql_thread;
4005
 
        xtWord8                 to_flush;
4006
 
 
4007
 
        if (!(mysql_thread = myxt_create_thread()))
4008
 
                xt_throw(self);
4009
 
 
4010
 
        while (!self->t_quit) {
4011
 
                try_(a) {
4012
 
                        /*
4013
 
                         * The garbage collector requires that the database
4014
 
                         * is in use because.
4015
 
                         */
4016
 
                        xt_use_database(self, db, XT_FOR_CHECKPOINTER);
4017
 
 
4018
 
                        /* This action is both safe and required (see details elsewhere) */
4019
 
                        xt_heap_release(self, self->st_database);
4020
 
 
4021
 
                        xt_set_low_priority(self);
4022
 
 
4023
 
                        to_flush = xt_trace_clock() + XT_XLOG_FLUSH_FREQ * 1000;
4024
 
                        for (;;) {
4025
 
                                /* Wait 1 second: */
4026
 
                                while (!self->t_quit && xt_trace_clock() < to_flush)
4027
 
                                        xt_sleep_milli_second(10);
4028
 
 
4029
 
                                if (self->t_quit)
4030
 
                                        break;
4031
 
 
4032
 
                                if (!db->db_xlog.xlog_flush(self))
4033
 
                                        xt_throw(self);
4034
 
 
4035
 
                                to_flush += XT_XLOG_FLUSH_FREQ * 1000;
4036
 
                        }
4037
 
                }
4038
 
                catch_(a) {
4039
 
                        /* This error is "normal"! */
4040
 
                        if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
4041
 
                                !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
4042
 
                                self->t_exception.e_sys_err == SIGTERM))
4043
 
                                xt_log_and_clear_exception(self);
4044
 
                }
4045
 
                cont_(a);
4046
 
 
4047
 
                /* Avoid releasing the database (done above) */
4048
 
                self->st_database = NULL;
4049
 
                xt_unuse_database(self, self);
4050
 
 
4051
 
                /* After an exception, pause before trying again... */
4052
 
                /* Number of seconds */
4053
 
                count = 60;
4054
 
                while (!self->t_quit && count > 0) {
4055
 
                        sleep(1);
4056
 
                        count--;
4057
 
                }
4058
 
        }
4059
 
 
4060
 
   /*
4061
 
        * {MYSQL-THREAD-KILL}
4062
 
        myxt_destroy_thread(mysql_thread, TRUE);
4063
 
        */
4064
 
        return NULL;
4065
 
}
4066
 
 
4067
 
static void xres_fl_free_thread(XTThreadPtr self, void *data)
4068
 
{
4069
 
        XTDatabaseHPtr db = (XTDatabaseHPtr) data;
4070
 
 
4071
 
        if (db->db_fl_thread) {
4072
 
                xt_lock_mutex(self, &db->db_fl_lock);
4073
 
                pushr_(xt_unlock_mutex, &db->db_fl_lock);
4074
 
                db->db_fl_thread = NULL;
4075
 
                freer_(); // xt_unlock_mutex(&db->db_fl_lock)
4076
 
        }
4077
 
}
4078
 
 
4079
 
xtPublic void xt_start_flusher(XTThreadPtr self, XTDatabaseHPtr db)
4080
 
{
4081
 
        char name[PATH_MAX];
4082
 
 
4083
 
        sprintf(name, "FL-%s", xt_last_directory_of_path(db->db_main_path));
4084
 
        xt_remove_dir_char(name);
4085
 
        db->db_fl_thread = xt_create_daemon(self, name);
4086
 
        xt_set_thread_data(db->db_fl_thread, db, xres_fl_free_thread);
4087
 
        xt_run_thread(self, db->db_fl_thread, xres_fl_run_thread);
4088
 
}
4089
 
 
4090
 
xtPublic void xt_stop_flusher(XTThreadPtr self, XTDatabaseHPtr db)
4091
 
{
4092
 
        XTThreadPtr thr_fl;
4093
 
 
4094
 
        if (db->db_fl_thread) {
4095
 
                xt_lock_mutex(self, &db->db_fl_lock);
4096
 
                pushr_(xt_unlock_mutex, &db->db_fl_lock);
4097
 
 
4098
 
                /* This pointer is safe as long as you have the transaction lock. */
4099
 
                if ((thr_fl = db->db_fl_thread)) {
4100
 
                        xtThreadID tid = thr_fl->t_id;
4101
 
 
4102
 
                        /* Make sure the thread quits when woken up. */
4103
 
                        xt_terminate_thread(self, thr_fl);
4104
 
 
4105
 
                        freer_(); // xt_unlock_mutex(&db->db_cp_lock)
4106
 
 
4107
 
                        xt_wait_for_thread_to_exit(tid, FALSE);
4108
 
                        db->db_fl_thread = NULL;
4109
 
                }
4110
 
                else
4111
 
                        freer_(); // xt_unlock_mutex(&db->db_cp_lock)
4112
 
        }
4113
 
}
4114