1
/* Copyright (C) 2007 PrimeBase Technologies GmbH
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* 2007-11-12 Paul McCullagh
23
* Restart and write data to the database.
26
#include "xt_config.h"
32
#include "mysql_priv.h"
38
#include <drizzled/data_home.h>
39
using drizzled::module::Registry;
42
#include "xactlog_xt.h"
43
#include "database_xt.h"
45
#include "strutil_xt.h"
46
#include "filesys_xt.h"
50
static xtBool xres_get_next_to_flush(XTDatabaseHPtr db, xtTableID *tab_id, int *flush_bit, xtBool skip_busy, xtBool rec_first);
54
//#define DEBUG_KEEP_LOGS
55
//#define PRINT_LOG_ON_RECOVERY
56
//#define TRACE_RECORD_DATA
57
//#define SKIP_STARTUP_CHECKPOINT
58
//#define NEVER_CHECKPOINT
59
//#define TRACE_CHECKPOINT
63
//#define PRINTF xt_ftracef
64
//#define PRINTF xt_trace
67
* -----------------------------------------------------------------------
71
xtPublic int pbxt_recovery_state;
74
* -----------------------------------------------------------------------
78
#ifdef TRACE_RECORD_DATA
79
static void xt_print_bytes(xtWord1 *buf, u_int len)
81
for (u_int i=0; i<len; i++) {
82
PRINTF("%02x ", (u_int) *buf);
88
void xt_print_log_record(xtLogID log, xtLogOffset offset, XTXactLogBufferDPtr record)
90
const char *type = NULL;
91
const char *rec_type = NULL;
95
xtRecordID rec_id = 0;
96
xtBool xn_set = FALSE;
99
XTTabRecExtDPtr rec_buf;
100
XTTabRecExtDPtr ext_rec;
101
XTTabRecFixDPtr fix_rec;
104
xtLogOffset log_offset = 0;
110
switch (record->xl.xl_status_1) {
111
case XT_LOG_ENT_REC_MODIFIED:
112
case XT_LOG_ENT_UPDATE:
113
case XT_LOG_ENT_INSERT:
114
case XT_LOG_ENT_DELETE:
115
case XT_LOG_ENT_UPDATE_BG:
116
case XT_LOG_ENT_INSERT_BG:
117
case XT_LOG_ENT_DELETE_BG:
118
op_no = XT_GET_DISK_4(record->xu.xu_op_seq_4);
119
tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
120
rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
121
xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
122
row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
123
rec_len = XT_GET_DISK_2(record->xu.xu_size_2);
126
rec_buf = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;
127
ext_rec = (XTTabRecExtDPtr) &record->xu.xu_rec_type_1;
128
if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
129
log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
130
log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
134
fix_rec = (XTTabRecFixDPtr) &record->xu.xu_rec_type_1;
137
case XT_LOG_ENT_UPDATE_FL:
138
case XT_LOG_ENT_INSERT_FL:
139
case XT_LOG_ENT_DELETE_FL:
140
case XT_LOG_ENT_UPDATE_FL_BG:
141
case XT_LOG_ENT_INSERT_FL_BG:
142
case XT_LOG_ENT_DELETE_FL_BG:
143
op_no = XT_GET_DISK_4(record->xf.xf_op_seq_4);
144
tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
145
rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
146
xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
147
row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
148
rec_len = XT_GET_DISK_2(record->xf.xf_size_2);
151
rec_buf = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;
152
ext_rec = (XTTabRecExtDPtr) &record->xf.xf_rec_type_1;
153
if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
154
log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
155
log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
159
fix_rec = (XTTabRecFixDPtr) &record->xf.xf_rec_type_1;
162
case XT_DEFUNKT_REC_FREED:
163
case XT_DEFUNKT_REC_REMOVED:
164
case XT_DEFUNKT_REC_REMOVED_EXT:
165
op_no = XT_GET_DISK_4(record->fr.fr_op_seq_4);
166
tab_id = XT_GET_DISK_4(record->fr.fr_tab_id_4);
167
rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
168
xn_id = XT_GET_DISK_4(record->fr.fr_xact_id_4);
172
case XT_LOG_ENT_REC_REMOVED_BI:
173
op_no = XT_GET_DISK_4(record->rb.rb_op_seq_4);
174
tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
175
rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
176
xn_id = XT_GET_DISK_4(record->rb.rb_xact_id_4);
177
row_id = XT_GET_DISK_4(record->rb.rb_row_id_4);
178
rec_len = XT_GET_DISK_2(record->rb.rb_size_2);
181
rec_buf = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
182
ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
183
if (XT_REC_IS_EXT_DLOG(record->rb.rb_rec_type_1)) {
184
log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
185
log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
189
fix_rec = (XTTabRecFixDPtr) &record->rb.rb_rec_type_1;
192
case XT_LOG_ENT_REC_REMOVED_BI_L:
193
op_no = XT_GET_DISK_4(record->bl.bl_op_seq_4);
194
tab_id = XT_GET_DISK_4(record->bl.bl_tab_id_4);
195
rec_id = XT_GET_DISK_4(record->bl.bl_rec_id_4);
196
xn_id = XT_GET_DISK_4(record->bl.bl_xact_id_4);
197
row_id = XT_GET_DISK_4(record->bl.bl_row_id_4);
198
rec_len = XT_GET_DISK_2(record->bl.bl_size_2);
201
rec_buf = (XTTabRecExtDPtr) &record->bl.bl_rec_type_1;
202
ext_rec = (XTTabRecExtDPtr) &record->bl.bl_rec_type_1;
203
if (XT_REC_IS_EXT_DLOG(record->bl.bl_rec_type_1)) {
204
log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
205
log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
209
fix_rec = (XTTabRecFixDPtr) &record->bl.bl_rec_type_1;
212
case XT_LOG_ENT_REC_MOVED:
213
op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);
214
tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
215
rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
216
log_id = XT_GET_DISK_2(&record->xw.xw_rec_type_1); // This is actually correct
217
log_offset = XT_GET_DISK_6(record->xw.xw_next_rec_id_4); // This is actually correct!
220
case XT_LOG_ENT_REC_CLEANED:
221
case XT_LOG_ENT_REC_CLEANED_1:
222
case XT_LOG_ENT_REC_UNLINKED:
223
op_no = XT_GET_DISK_4(record->xw.xw_op_seq_4);
224
tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
225
rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
228
case XT_LOG_ENT_ROW_NEW:
229
case XT_LOG_ENT_ROW_NEW_FL:
230
case XT_LOG_ENT_ROW_ADD_REC:
231
case XT_LOG_ENT_ROW_SET:
232
case XT_LOG_ENT_ROW_FREED:
233
op_no = XT_GET_DISK_4(record->xa.xa_op_seq_4);
234
tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
235
rec_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
238
case XT_LOG_ENT_NO_OP:
239
op_no = XT_GET_DISK_4(record->no.no_op_seq_4);
240
tab_id = XT_GET_DISK_4(record->no.no_tab_id_4);
243
case XT_LOG_ENT_END_OF_LOG:
247
switch (record->xl.xl_status_1) {
248
case XT_LOG_ENT_HEADER:
251
case XT_LOG_ENT_NEW_LOG:
252
rec_type = "NEW LOG";
254
case XT_LOG_ENT_DEL_LOG:
255
sprintf(buffer, "DEL LOG log=%d ", (int) XT_GET_DISK_4(record->xl.xl_log_id_4));
258
case XT_LOG_ENT_NEW_TAB:
259
rec_type = "NEW TABLE";
260
tab_id = XT_GET_DISK_4(record->xt.xt_tab_id_4);
262
case XT_LOG_ENT_COMMIT:
264
xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
267
case XT_LOG_ENT_ABORT:
269
xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
272
case XT_LOG_ENT_CLEANUP:
273
rec_type = "CLEANUP";
274
xn_id = XT_GET_DISK_4(record->xc.xc_xact_id_4);
277
case XT_LOG_ENT_REC_MODIFIED:
278
rec_type = "MODIFIED";
280
case XT_LOG_ENT_UPDATE:
283
case XT_LOG_ENT_UPDATE_FL:
284
rec_type = "UPDATE-FL";
286
case XT_LOG_ENT_INSERT:
289
case XT_LOG_ENT_INSERT_FL:
290
rec_type = "INSERT-FL";
292
case XT_LOG_ENT_DELETE:
295
case XT_LOG_ENT_DELETE_FL:
296
rec_type = "DELETE-FL";
298
case XT_LOG_ENT_UPDATE_BG:
299
rec_type = "UPDATE-BG";
301
case XT_LOG_ENT_UPDATE_FL_BG:
302
rec_type = "UPDATE-FL-BG";
304
case XT_LOG_ENT_INSERT_BG:
305
rec_type = "INSERT-BG";
307
case XT_LOG_ENT_INSERT_FL_BG:
308
rec_type = "INSERT-FL-BG";
310
case XT_LOG_ENT_DELETE_BG:
311
rec_type = "DELETE-BG";
313
case XT_LOG_ENT_DELETE_FL_BG:
314
rec_type = "DELETE-FL-BG";
316
case XT_DEFUNKT_REC_FREED:
317
rec_type = "FREE REC";
319
case XT_DEFUNKT_REC_REMOVED:
320
rec_type = "REMOVED REC";
322
case XT_DEFUNKT_REC_REMOVED_EXT:
323
rec_type = "REMOVED-X REC";
325
case XT_LOG_ENT_REC_REMOVED_BI:
326
rec_type = "REMOVED-BI REC";
328
case XT_LOG_ENT_REC_REMOVED_BI_L:
329
rec_type = "REMOVED-BI (L) REC";
331
case XT_LOG_ENT_REC_MOVED:
332
rec_type = "MOVED REC";
334
case XT_LOG_ENT_REC_CLEANED:
335
rec_type = "CLEAN REC";
337
case XT_LOG_ENT_REC_CLEANED_1:
338
rec_type = "CLEAN REC-1";
340
case XT_LOG_ENT_REC_UNLINKED:
341
rec_type = "UNLINK REC";
343
case XT_LOG_ENT_ROW_NEW:
344
rec_type = "NEW ROW";
346
case XT_LOG_ENT_ROW_NEW_FL:
347
rec_type = "NEW ROW-FL";
349
case XT_LOG_ENT_ROW_ADD_REC:
350
rec_type = "REC ADD ROW";
352
case XT_LOG_ENT_ROW_SET:
353
rec_type = "SET ROW";
355
case XT_LOG_ENT_ROW_FREED:
356
rec_type = "FREE ROW";
358
case XT_LOG_ENT_OP_SYNC:
359
rec_type = "OP SYNC";
361
case XT_LOG_ENT_NO_OP:
364
case XT_LOG_ENT_END_OF_LOG:
365
rec_type = "END OF LOG";
367
case XT_LOG_ENT_PREPARE:
368
rec_type = "PREPARE";
369
xn_id = XT_GET_DISK_4(record->xp.xp_xact_id_4);
375
PRINTF("log=%d offset=%d ", (int) log, (int) offset);
376
PRINTF("%s ", rec_type);
378
PRINTF("op=%lu tab=%lu %s=%lu ", (u_long) op_no, (u_long) tab_id, type, (u_long) rec_id);
380
PRINTF("tab=%lu ", (u_long) tab_id);
382
PRINTF("row=%lu ", (u_long) row_id);
384
PRINTF("log=%lu offset=%lu ", (u_long) log_id, (u_long) log_offset);
386
PRINTF("xact=%lu ", (u_long) xn_id);
388
#ifdef TRACE_RECORD_DATA
390
switch (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_MASK) {
391
case XT_TAB_STATUS_FREED:
394
case XT_TAB_STATUS_DELETE:
397
case XT_TAB_STATUS_FIXED:
400
case XT_TAB_STATUS_VARIABLE:
403
case XT_TAB_STATUS_EXT_DLOG:
407
if (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_CLEANED_BIT)
413
rec_len -= offsetof(XTTabRecExtDRec, re_data);
414
xt_print_bytes((xtWord1 *) ext_rec, offsetof(XTTabRecExtDRec, re_data));
418
xt_print_bytes(ext_rec->re_data, rec_len);
421
rec_len -= offsetof(XTTabRecFixDRec, rf_data);
422
xt_print_bytes((xtWord1 *) fix_rec, offsetof(XTTabRecFixDRec, rf_data));
426
xt_print_bytes(fix_rec->rf_data, rec_len);
434
void check_rows(void)
436
static XTOpenFilePtr of = NULL;
439
of = xt_open_file_ns("./test/test_tab-1.xtr", XT_FS_DEFAULT);
441
size_t size = (size_t) xt_seek_eof_file(NULL, of);
442
xtWord8 *buffer = (xtWord8 *) xt_malloc_ns(size);
443
xt_pread_file(of, 0, size, size, buffer, NULL);
444
for (size_t i=0; i<size/8; i++) {
446
printf("%d is NULL\n", (int) i);
454
* -----------------------------------------------------------------------
455
* DELAYED WRITE FUNCTIONS
458
#ifdef XT_SORT_REC_WRITES
460
xtPublic xtBool xt_xres_delay_flush(XTOpenTablePtr ot, xtBool lock)
462
XTTableHPtr tab = ot->ot_table;
464
XTThreadPtr thread = ot->ot_thread;
466
if (!xt_sl_get_size(tab->tab_rec_dw_writes))
470
xt_sl_lock_ns(tab->tab_rec_dw_writes, ot->ot_thread);
471
if ((ptr = (XTDelayWritePtr) xt_sl_item_at(tab->tab_rec_dw_writes, 0))) {
474
size = xt_sl_get_size(tab->tab_rec_dw_writes);
478
ptr->dw_rec_id+1 == (ptr+1)->dw_rec_id &&
479
(ptr+1)->dw_offset == 0 &&
480
ptr->dw_data + tab->tab_dic.dic_rec_size - ptr->dw_offset == (ptr+1)->dw_data) {
481
XTDelayWritePtr start_ptr;
488
ptr->dw_rec_id+1 == (ptr+1)->dw_rec_id &&
489
(ptr+1)->dw_offset == 0 &&
490
ptr->dw_data + tab->tab_dic.dic_rec_size - ptr->dw_offset == (ptr+1)->dw_data) {
494
tfer = (ptr->dw_data + ptr->dw_size) - start_ptr->dw_data;
495
if (!xt_pwrite_file(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, start_ptr->dw_rec_id) + start_ptr->dw_offset, tfer, &tab->tab_rec_dw_data[start_ptr->dw_data], &thread->st_statistics.st_rec, thread))
499
if (!xt_pwrite_file(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, ptr->dw_rec_id) + ptr->dw_offset, ptr->dw_size, &tab->tab_rec_dw_data[ptr->dw_data], &thread->st_statistics.st_rec, thread))
506
xt_sl_set_size(tab->tab_rec_dw_writes, 0);
507
tab->tab_rec_dw_data_usage = 0;
508
tab->tab_head_op_seq = tab->tab_rec_dw_op_seq;
510
xt_sl_unlock_ns(tab->tab_rec_dw_writes);
515
xt_sl_unlock_ns(tab->tab_rec_dw_writes);
519
xtPublic void xt_xres_flush_all(XTThreadPtr self, XTWriterStatePtr ws)
524
for (int i=0; i<XT_TABLE_LIST_SIZE; i++) {
525
if ((tab_id = ws->ws_tab_flush_list[i])) {
526
if ((ot = ws->ws_ot) && ot->ot_table->tab_id == tab_id) {
527
if (!xt_xres_delay_flush(ot, TRUE))
531
if ((ot = xt_db_open_pool_table(self, ws->ws_db, tab_id, NULL, TRUE))) {
532
if (!xt_xres_delay_flush(ot, TRUE))
534
xt_db_return_table_to_pool(self, ot);
537
ws->ws_tab_flush_list[i] = 0;
542
static void xres_table_to_flush(XTThreadPtr self, XTWriterStatePtr ws, xtTableID tab_id)
547
idx = tab_id % XT_TABLE_LIST_SIZE;
548
if (ws->ws_tab_flush_list[idx] == tab_id)
550
if (!ws->ws_tab_flush_list[idx])
552
idx = (idx + XT_TABLE_LIST_INC) % XT_TABLE_LIST_SIZE;
553
if (ws->ws_tab_flush_list[idx] == tab_id)
555
if (!ws->ws_tab_flush_list[idx])
557
idx = (idx + XT_TABLE_LIST_INC) % XT_TABLE_LIST_SIZE;
558
if (ws->ws_tab_flush_list[idx] == tab_id)
560
if (!ws->ws_tab_flush_list[idx])
562
idx = (idx + XT_TABLE_LIST_INC) % XT_TABLE_LIST_SIZE;
563
if (ws->ws_tab_flush_list[idx] == tab_id)
565
if (!ws->ws_tab_flush_list[idx])
568
/* After 4 tries we consider the table too full: */
569
xt_xres_flush_all(self, ws);
573
ws->ws_tab_flush_list[idx] = tab_id;
576
static void xres_delay_write(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, size_t offs, size_t size, xtWord1 *data, xtBool in_sequence, XTWriterStatePtr ws)
578
XTTableHPtr tab = ot->ot_table;
580
ASSERT_NS(offs <= 0xFFFF);
581
ASSERT_NS(size <= 0xFFFF);
582
if (in_sequence && tab->tab_rec_dw_writes) {
584
XTDelayWriteRec dw, *dw_ptr;
587
xt_sl_lock_ns(tab->tab_rec_dw_writes, ot->ot_thread);
589
if (tab->tab_rec_dw_data_usage > XT_SORT_REC_MAX_BUF_SIZE) {
590
if (!xt_xres_delay_flush(ot, FALSE))
595
if ((dw_ptr = (XTDelayWritePtr) xt_sl_search(self, &idx, tab->tab_rec_dw_writes, &rec_id))) {
596
/* Record has already been written: */
597
if (offs >= dw_ptr->dw_offset) {
598
if (offs + size <= dw_ptr->dw_offset + dw_ptr->dw_size)
599
/* Completely in old block: */
600
memcpy(tab->tab_rec_dw_data + dw_ptr->dw_data + (offs - dw_ptr->dw_offset), data, size);
602
if (offs > dw_ptr->dw_offset + dw_ptr->dw_size) {
603
/* Records do not overlap (cannot handle this situation)! */
604
if (!xt_xres_delay_flush(ot, FALSE))
609
/* The current record overlaps, and follows on: */
610
part_size = offs - dw_ptr->dw_offset;
612
if (tab->tab_rec_dw_data_usage + part_size + size > tab->tab_rec_dw_data_size) {
613
if (!xt_realloc_ns((void **) &tab->tab_rec_dw_data, tab->tab_rec_dw_data_usage + part_size + size))
615
tab->tab_rec_dw_data_size = tab->tab_rec_dw_data_usage + part_size + size;
618
memcpy(tab->tab_rec_dw_data + tab->tab_rec_dw_data_usage, tab->tab_rec_dw_data + dw_ptr->dw_data, part_size);
619
memcpy(tab->tab_rec_dw_data + tab->tab_rec_dw_data_usage + part_size, data, size);
620
dw_ptr->dw_size = (xtWord2) (part_size + size);
621
dw_ptr->dw_data = tab->tab_rec_dw_data_usage;
622
tab->tab_rec_dw_data_usage += part_size + size;
626
if (offs + size >= dw_ptr->dw_offset + dw_ptr->dw_size) {
627
/* Completely covers old update (and is bigger): */
628
if (tab->tab_rec_dw_data_usage + size > tab->tab_rec_dw_data_size) {
629
if (!xt_realloc_ns((void **) &tab->tab_rec_dw_data, tab->tab_rec_dw_data_usage + size))
631
tab->tab_rec_dw_data_size = tab->tab_rec_dw_data_usage + size;
633
memcpy(&tab->tab_rec_dw_data[tab->tab_rec_dw_data_usage], data, size);
634
dw_ptr->dw_offset = (xtWord2) offs;
635
dw_ptr->dw_size = (xtWord2) size;
636
dw_ptr->dw_data = tab->tab_rec_dw_data_usage;
637
tab->tab_rec_dw_data_usage += size;
640
if (offs + size < dw_ptr->dw_offset) {
641
/* Records do not overlap (cannot handle this situation)! */
642
if (!xt_xres_delay_flush(ot, FALSE))
647
/* The part in the record we want to keep: */
648
part_size = dw_ptr->dw_offset + dw_ptr->dw_size - (offs + size);
650
if (tab->tab_rec_dw_data_usage + part_size + size > tab->tab_rec_dw_data_size) {
651
if (!xt_realloc_ns((void **) &tab->tab_rec_dw_data, tab->tab_rec_dw_data_usage + part_size + size))
653
tab->tab_rec_dw_data_size = tab->tab_rec_dw_data_usage + part_size + size;
656
memcpy(tab->tab_rec_dw_data + tab->tab_rec_dw_data_usage, data, size);
657
memcpy(tab->tab_rec_dw_data + tab->tab_rec_dw_data_usage + size, tab->tab_rec_dw_data + dw_ptr->dw_data + dw_ptr->dw_size - part_size, part_size);
658
dw_ptr->dw_offset = (xtWord2) offs;
659
dw_ptr->dw_size = (xtWord2) (part_size + size);
660
dw_ptr->dw_data = tab->tab_rec_dw_data_usage;
661
tab->tab_rec_dw_data_usage += part_size + size;
668
/* New record added, make space for at least 3 records! */
671
inc_size = tab->tab_dic.dic_rec_size * 3;
672
ASSERT(size*3 <= inc_size);
673
if (tab->tab_rec_dw_data_usage + inc_size > tab->tab_rec_dw_data_size) {
674
if (!xt_realloc_ns((void **) &tab->tab_rec_dw_data, tab->tab_rec_dw_data_usage + (inc_size * 4)))
676
tab->tab_rec_dw_data_size = tab->tab_rec_dw_data_usage + (inc_size * 4);
681
/* This record follows on after another */
682
dw_ptr = (XTDelayWritePtr) xt_sl_item_at(tab->tab_rec_dw_writes, idx-1);
684
/* Check if the record before can be expanded! */
685
if (dw_ptr->dw_rec_id+1 == rec_id &&
686
dw_ptr->dw_offset == 0 &&
687
dw_ptr->dw_size >= XT_REC_EXT_HEADER_SIZE &&
688
dw_ptr->dw_size < tab->tab_dic.dic_rec_size &&
689
dw_ptr->dw_data + dw_ptr->dw_size == tab->tab_rec_dw_data_usage) {
690
/* Round the size up... */
691
tab->tab_rec_dw_data_usage = dw_ptr->dw_data + tab->tab_dic.dic_rec_size;
696
memcpy(&tab->tab_rec_dw_data[tab->tab_rec_dw_data_usage], data, size);
697
dw.dw_rec_id = rec_id;
698
dw.dw_offset = (xtWord2) offs;
699
dw.dw_size = (xtWord2) size;
700
dw.dw_data = tab->tab_rec_dw_data_usage;
701
xt_sl_insert_item_at(self, tab->tab_rec_dw_writes, idx, &dw);
702
tab->tab_rec_dw_data_usage += size;
705
xt_sl_unlock_ns(tab->tab_rec_dw_writes);
707
/* Add the table to the table list! */
708
return xres_table_to_flush(self, ws, tab->tab_id);
711
if (xt_sl_get_size(tab->tab_rec_dw_writes)) {
712
if (!xt_xres_delay_flush(ot, TRUE))
716
if (!xt_tab_write_rec(ot, xt_rec_id_to_rec_offset(tab, rec_id) + offs, size, data))
721
xt_sl_unlock_ns(tab->tab_rec_dw_writes);
725
#define WRITE_REC_FILE(self, a, rec_id, off, c, d, e, ws) xres_delay_write(self, a, rec_id, off, c, d, e, ws)
729
static void xres_write_rec(XTThreadPtr self, XTOpenTablePtr ot, off_t offset, size_t size, xtWord1 *data)
731
if (!xt_tab_write_rec(ot, offset, size, data))
735
#define WRITE_REC_FILE(self, a, rec_id, off, c, d, e, sw) xres_write_rec(self, a, xt_rec_id_to_rec_offset(tab, rec_id) + (off), c, d)
739
/* ----------------------------------------------------------------------
740
* APPLYING CHANGES IN SEQUENCE
743
typedef struct XTOperation {
747
xtLogOffset or_log_offset;
748
} XTOperationRec, *XTOperationPtr;
750
static int xres_cmp_op_seq(struct XTThread *XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
752
xtOpSeqNo lf_op_seq = *((xtOpSeqNo *) a);
753
XTOperationPtr lf_ptr = (XTOperationPtr) b;
755
if (lf_op_seq == lf_ptr->or_op_seq)
757
if (XTTableSeq::xt_op_is_before(lf_op_seq, lf_ptr->or_op_seq))
762
xtPublic void xt_xres_init_tab(XTThreadPtr self, XTTableHPtr tab)
764
tab->tab_op_list = xt_new_sortedlist(self, sizeof(XTOperationRec), 20, 1000, xres_cmp_op_seq, NULL, NULL, TRUE, FALSE);
767
xtPublic void xt_xres_exit_tab(XTThreadPtr self, XTTableHPtr tab)
769
if (tab->tab_op_list) {
770
xt_free_sortedlist(self, tab->tab_op_list);
771
tab->tab_op_list = NULL;
775
static xtBool xres_open_table(XTThreadPtr self, XTWriterStatePtr ws, xtTableID tab_id)
779
if ((ot = ws->ws_ot)) {
780
if (ot->ot_table->tab_id == tab_id)
782
xt_db_return_table_to_pool(self, ot);
786
/* Don't recover tables that cannot be found, or tables that
789
if (ws->ws_tab_gone == tab_id || ws->ws_tab_temp == tab_id)
792
if ((ws->ws_ot = xt_db_open_pool_table(self, ws->ws_db, tab_id, NULL, TRUE))) {
795
tab = ws->ws_ot->ot_table;
797
/* Don't recover temporary tables! */
798
if (ws->ws_in_recover && XT_IS_TEMP_TABLE(tab->tab_dic.dic_tab_flags)) {
799
tab->tab_recovery_not_done = TRUE;
800
tab->tab_dic.dic_disable_index = XT_INDEX_NOT_RECOVERED;
801
xt_db_return_table_to_pool(self, ws->ws_ot);
803
ws->ws_tab_temp = tab_id;
807
if (!tab->tab_ind_rec_log_id) {
808
/* Should not happen... */
809
tab->tab_ind_rec_log_id = ws->ws_ind_rec_log_id;
810
tab->tab_ind_rec_log_offset = ws->ws_ind_rec_log_offset;
815
ws->ws_tab_gone = tab_id;
819
/* {INDEX-RECOV_ROWID}
820
* Add missing index entries during recovery.
821
* Set the row ID even if the index entry
822
* is not committed. It will be removed later by
825
static xtBool xres_add_index_entries(XTOpenTablePtr ot, xtRowID row_id, xtRecordID rec_id, xtWord1 *rec_data)
827
XTTableHPtr tab = ot->ot_table;
830
//XTIdxSearchKeyRec key;
832
if (tab->tab_dic.dic_disable_index)
835
for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
836
if (!xt_idx_insert(ot, *ind, row_id, rec_id, rec_data, NULL, TRUE)) {
837
/* Check the error, certain errors are recoverable! */
838
XTThreadPtr self = xt_get_self();
840
if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
841
(XT_FILE_IN_USE(self->t_exception.e_sys_err) ||
842
XT_FILE_ACCESS_DENIED(self->t_exception.e_sys_err) ||
843
XT_FILE_TOO_MANY_OPEN(self->t_exception.e_sys_err) ||
844
self->t_exception.e_sys_err == XT_ENOMEM)) {
845
ot->ot_err_index_no = (*ind)->mi_index_no;
849
/* TODO: Write something to the index header to indicate that
852
xt_tab_disable_index(ot->ot_table, XT_INDEX_CORRUPTED);
853
xt_log_and_clear_exception_ns();
860
static void xres_remove_index_entries(XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *rec_data)
862
XTTableHPtr tab = ot->ot_table;
866
if (tab->tab_dic.dic_disable_index)
869
for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
870
if (!xt_idx_delete(ot, *ind, rec_id, rec_data))
871
xt_log_and_clear_exception_ns();
875
static xtWord1 *xres_load_record(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *data, size_t red_size, XTInfoBufferPtr rec_buf, u_int cols_req)
877
XTTableHPtr tab = ot->ot_table;
880
rec_data = ot->ot_row_rbuffer;
882
ASSERT(red_size <= ot->ot_row_rbuf_size);
883
ASSERT(tab->tab_dic.dic_rec_size <= ot->ot_row_rbuf_size);
885
if (rec_data != data)
886
memcpy(rec_data, data, red_size);
889
/* It can be that less than 'dic_rec_size' was written for
890
* variable length type records.
891
* If this is the last record in the file, then we will read
892
* less than actual record size.
894
if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, rec_data, &red_size, &self->st_statistics.st_rec, self))
897
if (red_size < sizeof(XTTabRecHeadDRec))
901
if (XT_REC_IS_FIXED(rec_data[0]))
902
rec_data = ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE;
904
if (!xt_ib_alloc(NULL, rec_buf, tab->tab_dic.dic_mysql_buf_size))
906
if (XT_REC_IS_VARIABLE(rec_data[0])) {
907
if (!myxt_load_row(ot, rec_data + XT_REC_FIX_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
910
else if (XT_REC_IS_EXT_DLOG(rec_data[0])) {
911
if (red_size < XT_REC_EXT_HEADER_SIZE)
915
if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
916
if (!myxt_load_row(ot, rec_data + XT_REC_EXT_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
920
if (!xt_tab_load_ext_data(ot, rec_id, rec_buf->ib_db.db_data, cols_req))
925
/* This is possible, the record has already been cleaned up. */
927
rec_data = rec_buf->ib_db.db_data;
933
/* Running out of memory should not be ignored. */
934
if (self->t_exception.e_xt_err == XT_SYSTEM_ERROR &&
935
self->t_exception.e_sys_err == XT_ENOMEM)
937
xt_log_and_clear_exception_ns();
942
* Apply a change from the log.
944
* This function is basically very straight forward, were it not
945
* for the option to apply operations out of sequence.
946
* (i.e. in_sequence == FALSE)
948
* If operations are applied in sequence, then they can be
949
* applied blindly. The update operation is just executed as
952
* If the changes are not in sequence, then some operation are missing,
953
* however, the operations that are present are in the correct order.
955
* This can only happen at the end of recovery!!!
956
* After we have applied all operations in the log we may be
957
* left with some operations that have not been applied
958
* because operations were logged out of sequence.
960
* The application of these operations there has to take into
961
* account the current state of the database.
962
* They are then applied in a manner that maintains the
963
* database consistency.
965
* For example, a record that is freed, is free by placing it
966
* on the current free list. Part of the data logged for the
967
* operation is ignored. Namely: the "next block" pointer
968
* that was originally written into the freed record.
970
static void xres_apply_change(XTThreadPtr self, XTWriterStatePtr ws, XTXactLogBufferDPtr record, xtBool in_sequence, xtBool check_index, xtOpSeqNo op_seq)
972
XTOpenTablePtr ot = ws->ws_ot;
973
XTInfoBufferPtr rec_buf = &ws->ws_rec_buf;
974
XTTableHPtr tab = ot->ot_table;
978
XTTabRecFreeDRec free_rec;
980
XTTabRowRefDRec row_buf;
981
XTTabRecHeadDRec rec_head;
983
xtRecordID link_rec_id, prev_link_rec_id;
984
xtWord1 *rec_data = NULL;
985
XTTabRecFreeDPtr free_data;
988
ASSERT(ot->ot_thread == self);
989
if (tab->tab_dic.dic_key_count == 0)
992
status = record->xl.xl_status_1;
994
case XT_LOG_ENT_REC_MODIFIED:
995
case XT_LOG_ENT_UPDATE:
996
case XT_LOG_ENT_INSERT:
997
case XT_LOG_ENT_DELETE:
998
case XT_LOG_ENT_UPDATE_BG:
999
case XT_LOG_ENT_INSERT_BG:
1000
case XT_LOG_ENT_DELETE_BG:
1001
rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
1003
/* This should be done before we apply change to table, as otherwise we lose
1004
* the key value that we need to remove from index
1006
if (check_index && status == XT_LOG_ENT_REC_MODIFIED) {
1007
if ((rec_data = xres_load_record(self, ot, rec_id, NULL, 0, rec_buf, tab->tab_dic.dic_ind_cols_req)))
1008
xres_remove_index_entries(ot, rec_id, rec_data);
1011
len = (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
1012
WRITE_REC_FILE(self, ot, rec_id, 0, len, (xtWord1 *) &record->xu.xu_rec_type_1, in_sequence, ws);
1013
tab->tab_bytes_to_flush += len;
1017
case XT_LOG_ENT_DELETE:
1018
case XT_LOG_ENT_DELETE_BG:
1021
if ((rec_data = xres_load_record(self, ot, rec_id, &record->xu.xu_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {
1022
row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
1023
if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))
1031
/* A record has been allocated from the EOF, but out of sequence.
1032
* This could leave a gap where other records were allocated
1033
* from the EOF, but those operations have been lost!
1034
* We compensate for this by adding all blocks between
1037
free_rec.rf_rec_type_1 = XT_TAB_STATUS_FREED;
1038
free_rec.rf_not_used_1 = 0;
1039
while (tab->tab_head_rec_eof_id < rec_id) {
1040
XT_SET_DISK_4(free_rec.rf_next_rec_id_4, tab->tab_head_rec_free_id);
1041
if (!xt_tab_write_rec(ot, tab->tab_head_rec_eof_id, sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec))
1043
tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
1044
tab->tab_head_rec_free_id = tab->tab_head_rec_eof_id;
1045
tab->tab_head_rec_eof_id++;
1048
if (tab->tab_head_rec_eof_id < rec_id + 1)
1049
tab->tab_head_rec_eof_id = rec_id + 1;
1050
tab->tab_flush_pending = TRUE;
1052
case XT_LOG_ENT_UPDATE_FL:
1053
case XT_LOG_ENT_INSERT_FL:
1054
case XT_LOG_ENT_DELETE_FL:
1055
case XT_LOG_ENT_UPDATE_FL_BG:
1056
case XT_LOG_ENT_INSERT_FL_BG:
1057
case XT_LOG_ENT_DELETE_FL_BG:
1058
rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
1059
len = (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
1060
free_ref_id = XT_GET_DISK_4(record->xf.xf_free_rec_id_4);
1063
record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL &&
1064
record->xf.xf_status_1 != XT_LOG_ENT_DELETE_FL_BG) {
1065
if ((rec_data = xres_load_record(self, ot, rec_id, &record->xf.xf_rec_type_1, len, rec_buf, tab->tab_dic.dic_ind_cols_req))) {
1066
row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
1067
if (!xres_add_index_entries(ot, row_id, rec_id, rec_data))
1073
/* This record was allocated from the free list.
1074
* Because this operation is out of sequence, there
1075
* could have been other allocations from the
1076
* free list before this, that have gone missing.
1077
* For this reason we have to search the current
1078
* free list and remove the record.
1080
link_rec_id = tab->tab_head_rec_free_id;
1081
prev_link_rec_id = 0;
1082
while (link_rec_id) {
1083
if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecFreeDRec), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec, NULL, &self->st_statistics.st_rec, self))
1085
if (link_rec_id == rec_id)
1087
prev_link_rec_id = link_rec_id;
1088
link_rec_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);
1090
if (link_rec_id == rec_id) {
1091
/* The block was found on the free list.
1093
if (prev_link_rec_id) {
1094
/* We write the record from position 'link_rec_id' into
1095
* position 'prev_link_rec_id'. This unlinks 'link_rec_id'!
1097
if (!xt_tab_write_rec(ot, xt_rec_id_to_rec_offset(tab, prev_link_rec_id), sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_rec))
1099
tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
1100
free_ref_id = tab->tab_head_rec_free_id;
1103
/* The block is at the front of the list: */
1104
free_ref_id = XT_GET_DISK_4(free_rec.rf_next_rec_id_4);
1107
/* Not found on the free list? */
1108
if (tab->tab_head_rec_eof_id < rec_id + 1)
1109
tab->tab_head_rec_eof_id = rec_id + 1;
1110
goto write_mod_data;
1113
if (tab->tab_head_rec_eof_id < rec_id + 1)
1114
tab->tab_head_rec_eof_id = rec_id + 1;
1115
tab->tab_head_rec_free_id = free_ref_id;
1116
tab->tab_head_rec_fnum--;
1118
WRITE_REC_FILE(self, ot, rec_id, 0, len, (xtWord1 *) &record->xf.xf_rec_type_1, in_sequence, ws);
1119
tab->tab_bytes_to_flush += len;
1120
tab->tab_flush_pending = TRUE;
1122
case XT_DEFUNKT_REC_REMOVED:
1123
case XT_DEFUNKT_REC_REMOVED_EXT: {
1124
xtBool record_loaded;
1125
XTTabRecExtDPtr ext_rec;
1127
xtWord4 log_over_size = 0;
1128
xtLogID data_log_id = 0;
1129
xtLogOffset data_log_offset = 0;
1130
u_int cols_required = 0;
1132
rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
1133
free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;
1135
/* This is a short-cut, it does not require loading the record: */
1136
if (!check_index && !tab->tab_dic.dic_blob_count && record->fr.fr_status_1 != XT_DEFUNKT_REC_REMOVED_EXT)
1139
ext_rec = (XTTabRecExtDPtr) ot->ot_row_rbuffer;
1141
if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), tab->tab_dic.dic_rec_size, 0, ext_rec, &red_size, &self->st_statistics.st_rec, self)) {
1142
xt_log_and_clear_exception_ns();
1146
if (red_size < sizeof(XTTabRecHeadDRec))
1149
/* Check that the record is the same as the one originally removed.
1150
* This can be different if recovery is repeated.
1153
* log=21 offset=6304472 REMOVED-X REC op=360616 tab=7 rec=25874
1154
* log=21 offset=6309230 UPDATE-FL op=360618 tab=7 rec=25874 row=26667 log=1 offset=26503077 xact=209
1155
* log=21 offset=6317500 CLEAN REC op=360631 tab=7 rec=25874
1157
* If this recovery sequence is repeated, then the REMOVED-X will free the
1158
* extended record belonging to the update that came afterwards!
1160
* Additional situation to consider:
1162
* - A record "x" is created, and index entries created.
1163
* - A checkpoint is made done.
1164
* - Record "x" is deleted due to UPDATE.
1165
* - The index entries are removed, but the index is not
1167
* - This deletion is written to disk by the writer.
1168
* So we have the situation that the remove is on disk,
1169
* but the index changes have not been made.
1171
* In this case, skipping to "do_rec_freed" is incorrect.
1173
if (record->fr.fr_stat_id_1 != ext_rec->tr_stat_id_1 ||
1174
XT_GET_DISK_4(record->fr.fr_xact_id_4) != XT_GET_DISK_4(ext_rec->tr_xact_id_4))
1175
goto dont_remove_x_record;
1177
if (status == XT_DEFUNKT_REC_REMOVED_EXT) {
1178
if (!XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1))
1179
goto dont_remove_x_record;
1180
if (red_size < offsetof(XTTabRecExtDRec, re_data))
1181
goto dont_remove_x_record;
1183
/* Save this for later (can be overwritten by xres_load_record(): */
1184
data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
1185
data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
1186
log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
1188
dont_remove_x_record:
1190
record_loaded = FALSE;
1193
cols_required = tab->tab_dic.dic_ind_cols_req;
1194
if (tab->tab_dic.dic_blob_cols_req > cols_required)
1195
cols_required = tab->tab_dic.dic_blob_cols_req;
1196
if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))
1198
record_loaded = TRUE;
1199
xres_remove_index_entries(ot, rec_id, rec_data);
1202
if (tab->tab_dic.dic_blob_count) {
1203
if (!record_loaded) {
1204
if (tab->tab_dic.dic_blob_cols_req > cols_required)
1205
cols_required = tab->tab_dic.dic_blob_cols_req;
1206
if (!(rec_data = xres_load_record(self, ot, rec_id, ot->ot_row_rbuffer, red_size, rec_buf, cols_required)))
1207
/* [(7)] REMOVE is followed by FREE:
1208
goto get_rec_offset;
1211
record_loaded = TRUE;
1215
if (status == XT_DEFUNKT_REC_REMOVED_EXT) {
1216
/* Note: dlb_delete_log() may be repeated, but should handle this:
1219
* log=5 offset=213334 CLEAN REC op=28175 tab=1 rec=317428
1221
* log=6 offset=321063 REMOVED-X REC op=33878 tab=1 rec=317428
1223
* When this sequence is repeated during recovery, then CLEAN REC
1224
* will reset the status byte of the record so that it
1225
* comes back to here!
1227
* The check for zero is probably not required here.
1229
if (data_log_id && data_log_offset && log_over_size) {
1230
if (ot->ot_table->tab_dic.dic_tab_flags & XT_TF_MEMORY_TABLE)
1231
xt_tab_free_ext_slot(tab, data_log_id, data_log_offset, log_over_size);
1233
if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {
1234
if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
1235
ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
1236
xt_log_and_clear_exception_ns();
1244
case XT_LOG_ENT_REC_REMOVED_BI:
1245
case XT_LOG_ENT_REC_REMOVED_BI_L:
1248
* For deletion we need the complete before image because of the following problem.
1250
* DROP TABLE IF EXISTS t1;
1251
* CREATE TABLE t1 (ID int primary key auto_increment, value int, index (value)) engine=pbxt;
1253
* insert t1(value) values(50);
1257
* update t1 set value = 60;
1261
* update t1 set value = 70;
1265
* select value from t1;
1268
* 081203 12:11:46 [Note] PBXT: Recovering from 1-148, bytes to read: 33554284
1269
* log=1 offset=148 UPDATE-BG op=5 tab=1 rec=2 row=1 xact=3
1270
* log=1 offset=188 REC ADD ROW op=6 tab=1 row=1
1271
* log=1 offset=206 COMMIT xact=3
1272
* log=1 offset=216 REMOVED REC op=7 tab=1 rec=1 xact=2
1273
* log=1 offset=241 CLEAN REC op=8 tab=1 rec=2
1274
* log=1 offset=261 CLEANUP xact=3
1275
* log=1 offset=267 UPDATE-FL-BG op=9 tab=1 rec=1 row=1 xact=4
1276
* log=1 offset=311 REC ADD ROW op=10 tab=1 row=1
1277
* log=1 offset=329 COMMIT xact=4
1278
* log=1 offset=339 REMOVED REC op=11 tab=1 rec=2 xact=3
1279
* log=1 offset=364 CLEAN REC op=12 tab=1 rec=1
1280
* log=1 offset=384 CLEANUP xact=4
1281
* 081203 12:12:15 [Note] PBXT: Recovering complete at 1-390, bytes read: 33554284
1283
* mysql> select value from t1;
1290
* 2 rows in set (55.99 sec)
1292
* mysql> select * from t1;
1298
* 1 row in set (0.00 sec)
1300
XTTabRecExtDPtr ext_rec;
1301
xtWord4 log_over_size = 0;
1302
xtLogID data_log_id = 0;
1303
xtLogOffset data_log_offset = 0;
1304
u_int cols_required = 0;
1305
xtBool record_loaded;
1308
rec_id = XT_GET_DISK_4(record->rb.rb_rec_id_4);
1309
rec_size = XT_GET_DISK_2(record->rb.rb_size_2);
1311
if (status == XT_LOG_ENT_REC_REMOVED_BI_L)
1312
ext_rec = (XTTabRecExtDPtr) &record->bl.bl_rec_type_1;
1314
ext_rec = (XTTabRecExtDPtr) &record->rb.rb_rec_type_1;
1316
if (XT_REC_IS_EXT_DLOG(ext_rec->tr_rec_type_1)) {
1317
/* Save this for later (can be overwritten by xres_load_record(): */
1318
data_log_id = XT_GET_DISK_2(ext_rec->re_log_id_2);
1319
data_log_offset = XT_GET_DISK_6(ext_rec->re_log_offs_6);
1320
log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
1323
record_loaded = FALSE;
1326
cols_required = tab->tab_dic.dic_ind_cols_req;
1327
if (!(rec_data = xres_load_record(self, ot, rec_id, (xtWord1 *) ext_rec, rec_size, rec_buf, cols_required)))
1329
record_loaded = TRUE;
1330
xres_remove_index_entries(ot, rec_id, rec_data);
1333
if (data_log_id && data_log_offset && log_over_size) {
1334
if (ot->ot_table->tab_dic.dic_tab_flags & XT_TF_MEMORY_TABLE)
1335
xt_tab_free_ext_slot(tab, data_log_id, data_log_offset, log_over_size);
1337
if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {
1338
if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
1339
ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
1340
xt_log_and_clear_exception_ns();
1346
/* Use the new record type: */
1347
ext_rec->tr_rec_type_1 = record->rb.rb_new_rec_type_1;
1348
free_data = (XTTabRecFreeDPtr) ext_rec;
1350
if (status == XT_LOG_ENT_REC_REMOVED_BI)
1356
XTTabRecFreeDRec prev_free_rec;
1357
xtRecordID prev_rec_id;
1359
prev_rec_id = XT_GET_DISK_4(record->bl.bl_prev_rec_id_4);
1360
XT_SET_DISK_4(prev_free_rec.rf_next_rec_id_4, rec_id);
1361
WRITE_REC_FILE(self, ot, prev_rec_id, offsetof(XTTabRecFreeDRec, rf_next_rec_id_4), 4, prev_free_rec.rf_next_rec_id_4, in_sequence, ws);
1362
tab->tab_bytes_to_flush += 4;
1363
tab->tab_flush_pending = TRUE;
1365
tab->tab_head_rec_fnum++;
1366
WRITE_REC_FILE(self, ot, rec_id, 0, sizeof(XTTabRecFreeDRec), (xtWord1 *) free_data, in_sequence, ws);
1367
tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
1368
tab->tab_flush_pending = TRUE;
1371
case XT_DEFUNKT_REC_FREED:
1372
rec_id = XT_GET_DISK_4(record->fr.fr_rec_id_4);
1373
free_data = (XTTabRecFreeDPtr) &record->fr.fr_rec_type_1;
1379
* We place the record on front of the current
1382
* However, before we do this, we remove the record
1383
* from its row list, if the record is on a row list.
1385
* We do this here, because in the normal removal
1386
* from the row list uses the operations:
1388
* XT_LOG_ENT_REC_UNLINKED, XT_LOG_ENT_ROW_SET and
1389
* XT_LOG_ENT_ROW_FREED.
1391
* When operations are performed out of sequence,
1392
* these operations are ignored for the purpose
1393
* of removing the record from the row.
1395
if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head, NULL, &self->st_statistics.st_rec, self))
1397
/* The record is already free: */
1398
if (XT_REC_IS_FREE(rec_head.tr_rec_type_1))
1400
row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
1402
/* Search the row for this record: */
1403
if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, NULL, &self->st_statistics.st_rec, self))
1405
link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1406
prev_link_rec_id = 0;
1407
while (link_rec_id) {
1408
if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, link_rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &red_size, &self->st_statistics.st_rec, self)) {
1409
xt_log_and_clear_exception(self);
1412
if (red_size < sizeof(XTTabRecHeadDRec))
1414
if (link_rec_id == rec_id)
1416
if (XT_GET_DISK_4(rec_head.tr_row_id_4) != row_id)
1418
switch (rec_head.tr_rec_type_1 & XT_TAB_STATUS_MASK) {
1419
case XT_TAB_STATUS_FREED:
1421
case XT_TAB_STATUS_DELETE:
1422
case XT_TAB_STATUS_FIXED:
1423
case XT_TAB_STATUS_VARIABLE:
1424
case XT_TAB_STATUS_EXT_DLOG:
1430
if (rec_head.tr_rec_type_1 & ~(XT_TAB_STATUS_CLEANED_BIT | XT_TAB_STATUS_MASK)) {
1434
prev_link_rec_id = link_rec_id;
1435
link_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
1439
if (link_rec_id == rec_id) {
1440
/* The record was found on the row list, remove it: */
1441
if (prev_link_rec_id) {
1442
/* We write the previous variation pointer from position 'link_rec_id' into
1443
* variation pointer of the 'prev_link_rec_id' record. This unlinks 'link_rec_id'!
1445
if (!xt_tab_write_rec(ot, xt_rec_id_to_rec_offset(tab, prev_link_rec_id) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4), XT_RECORD_ID_SIZE, (xtWord1 *) &rec_head.tr_prev_rec_id_4))
1447
tab->tab_bytes_to_flush += XT_RECORD_ID_SIZE;
1450
/* The record is at the front of the row list: */
1451
xtRefID ref_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
1452
XT_SET_DISK_4(row_buf.rr_ref_id_4, ref_id);
1453
if (!xt_tab_write_row(ot, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf))
1455
tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1459
/* Now we free the record, by placing it at the front of
1462
XT_SET_DISK_4(free_data->rf_next_rec_id_4, tab->tab_head_rec_free_id);
1464
tab->tab_head_rec_free_id = rec_id;
1465
tab->tab_head_rec_fnum++;
1466
WRITE_REC_FILE(self, ot, rec_id, 0, sizeof(XTTabRecFreeDRec), (xtWord1 *) free_data, in_sequence, ws);
1467
tab->tab_bytes_to_flush += sizeof(XTTabRecFreeDRec);
1468
tab->tab_flush_pending = TRUE;
1471
case XT_LOG_ENT_REC_MOVED:
1473
rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
1474
WRITE_REC_FILE(self, ot, rec_id, offsetof(XTTabRecExtDRec, re_log_id_2), len, (xtWord1 *) &record->xw.xw_rec_type_1, in_sequence, ws);
1475
tab->tab_bytes_to_flush += len;
1476
tab->tab_flush_pending = TRUE;
1478
case XT_LOG_ENT_REC_CLEANED:
1479
len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1480
goto get_rec_offset;
1481
case XT_LOG_ENT_REC_CLEANED_1:
1483
goto get_rec_offset;
1484
case XT_LOG_ENT_REC_UNLINKED:
1486
/* Unlink the record.
1487
* This is done when the record is freed.
1491
len = offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1493
rec_id = XT_GET_DISK_4(record->xw.xw_rec_id_4);
1494
WRITE_REC_FILE(self, ot, rec_id, 0, len, (xtWord1 *) &record->xw.xw_rec_type_1, in_sequence, ws);
1495
tab->tab_bytes_to_flush += len;
1496
tab->tab_flush_pending = TRUE;
1498
case XT_LOG_ENT_ROW_NEW:
1499
len = offsetof(XTactRowAddedEntryDRec, xa_free_list_4);
1500
row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
1502
/* A row was allocated from the EOF. Because operations are missing.
1503
* The blocks between the current EOF and the new EOF need to be
1504
* place on the free list!
1506
while (tab->tab_head_row_eof_id < row_id) {
1507
XT_SET_DISK_4(row_buf.rr_ref_id_4, tab->tab_head_row_free_id);
1508
if (!xt_tab_write_row(ot, xt_row_id_to_row_offset(tab, tab->tab_head_row_eof_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf))
1510
tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1511
tab->tab_head_row_free_id = tab->tab_head_row_eof_id;
1512
tab->tab_head_row_eof_id++;
1515
if (tab->tab_head_row_eof_id < row_id + 1)
1516
tab->tab_head_row_eof_id = row_id + 1;
1517
tab->tab_flush_pending = TRUE;
1519
case XT_LOG_ENT_ROW_NEW_FL:
1520
len = sizeof(XTactRowAddedEntryDRec);
1521
row_id = XT_GET_DISK_4(record->xa.xa_row_id_4);
1522
free_ref_id = XT_GET_DISK_4(record->xa.xa_free_list_4);
1525
/* The record was taken from the free list.
1526
* If the operations were in sequence, then this would be
1527
* the front of the free list now.
1528
* However, because operations are missing, it may no
1529
* longer be the front of the free list!
1530
* Search and remove:
1532
link_rec_id = tab->tab_head_row_free_id;
1533
prev_link_rec_id = 0;
1534
while (link_rec_id) {
1535
if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, link_rec_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &red_size, &self->st_statistics.st_rec, self)) {
1536
xt_log_and_clear_exception(self);
1539
if (red_size < sizeof(XTTabRowRefDRec))
1541
if (link_rec_id == row_id)
1543
prev_link_rec_id = link_rec_id;
1544
link_rec_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1546
if (link_rec_id == row_id) {
1547
/* The block was found on the free list, remove it: */
1548
if (prev_link_rec_id) {
1549
/* We write the record from position 'link_rec_id' into
1550
* position 'prev_link_rec_id'. This unlinks 'link_rec_id'!
1552
if (!xt_tab_write_row(ot, xt_row_id_to_row_offset(tab, prev_link_rec_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf))
1554
tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1555
free_ref_id = tab->tab_head_row_free_id;
1558
/* The block is at the front of the free list: */
1559
free_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1563
if (tab->tab_head_row_eof_id < row_id + 1)
1564
tab->tab_head_row_eof_id = row_id + 1;
1569
if (tab->tab_head_row_eof_id < row_id + 1)
1570
tab->tab_head_row_eof_id = row_id + 1;
1571
tab->tab_head_row_free_id = free_ref_id;
1572
tab->tab_head_row_fnum--;
1573
tab->tab_flush_pending = TRUE;
1575
case XT_LOG_ENT_ROW_FREED:
1576
row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
1579
* Since this operation is being performed out of sequence, we
1580
* must assume that some other free and allocation operations
1582
* For this reason, we add the row to the front of the
1583
* existing free list.
1585
XT_SET_DISK_4(record->wr.wr_ref_id_4, tab->tab_head_row_free_id);
1587
tab->tab_head_row_free_id = row_id;
1588
tab->tab_head_row_fnum++;
1589
goto write_row_data;
1590
case XT_LOG_ENT_ROW_ADD_REC:
1591
row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
1593
if (!XT_PREAD_RR_FILE(ot->ot_row_file, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), 0, (xtWord1 *) &row_buf, &tfer, &self->st_statistics.st_rec, self))
1595
if (tfer == sizeof(XTTabRowRefDRec)) {
1596
/* Add a record to the front of the row.
1597
* This is easy, but we have to make sure that the next
1598
* pointer in the record is correct.
1600
rec_id = XT_GET_DISK_4(record->wr.wr_ref_id_4);
1601
if (!XT_PREAD_RR_FILE(ot->ot_rec_file, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), 0, (xtWord1 *) &rec_head, &tfer, &self->st_statistics.st_rec, self))
1603
if (tfer == sizeof(XTTabRecHeadDRec) && XT_GET_DISK_4(rec_head.tr_row_id_4) == row_id) {
1604
/* This is now the correct next pointer: */
1605
xtRecordID next_ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1606
if (XT_GET_DISK_4(rec_head.tr_prev_rec_id_4) != next_ref_id &&
1607
rec_id != next_ref_id) {
1608
XT_SET_DISK_4(rec_head.tr_prev_rec_id_4, next_ref_id);
1609
if (!xt_tab_write_rec(ot, xt_rec_id_to_rec_offset(tab, rec_id), sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
1611
tab->tab_bytes_to_flush += sizeof(XTTabRecHeadDRec);
1616
goto write_row_data;
1617
case XT_LOG_ENT_ROW_SET:
1619
/* This operation is ignored when out of sequence!
1620
* The operation is used to remove a record from a row.
1621
* This is done automatically when the record is freed.
1624
row_id = XT_GET_DISK_4(record->wr.wr_row_id_4);
1626
ASSERT_NS(XT_GET_DISK_4(record->wr.wr_ref_id_4) < tab->tab_head_rec_eof_id);
1627
if (!xt_tab_write_row(ot, xt_row_id_to_row_offset(tab, row_id), sizeof(XTTabRowRefDRec), (xtWord1 *) &record->wr.wr_ref_id_4))
1629
tab->tab_bytes_to_flush += sizeof(XTTabRowRefDRec);
1630
if (tab->tab_head_row_eof_id < row_id + 1)
1631
tab->tab_head_row_eof_id = row_id + 1;
1632
tab->tab_flush_pending = TRUE;
1634
case XT_LOG_ENT_NO_OP:
1635
case XT_LOG_ENT_END_OF_LOG:
1640
#ifdef XT_REC_FLUSH_THRESHOLD
1641
if (self->st_statistics.st_rec.ts_write - tab->tab_rec_wr_last_flush > XT_REC_FLUSH_THRESHOLD) {
1642
tab->tab_rec_wr_last_flush = self->st_statistics.st_rec.ts_write;
1644
if (!xt_async_flush_record_row(tab, FALSE, self))
1649
#ifdef XT_SORT_REC_WRITES
1650
if (in_sequence && xt_sl_get_size(tab->tab_rec_dw_writes))
1651
tab->tab_rec_dw_op_seq = op_seq;
1653
tab->tab_head_op_seq = op_seq;
1655
tab->tab_head_op_seq = op_seq;
1657
tab->tab_wr_op_seq = op_seq;
1661
* Apply all operations that have been buffered
1662
* for a particular table.
1663
* Operations are buffered if they are
1664
* read from the log out of sequence.
1666
* In this case we buffer, and wait for the
1667
* out of sequence operations to arrive.
1669
* When the server is running, this will always be
1670
* the case. A delay occurs while a transaction
1671
* fills its private log buffer.
1673
static void xres_apply_operations(XTThreadPtr self, XTWriterStatePtr ws, xtBool in_sequence)
1675
XTTableHPtr tab = ws->ws_ot->ot_table;
1680
// XTDatabaseHPtr db, XTOpenTablePtr ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf
1681
xt_sl_lock(self, tab->tab_op_list);
1683
op = (XTOperationPtr) xt_sl_item_at(tab->tab_op_list, i);
1686
if (in_sequence && tab->tab_wr_op_seq+1 != op->or_op_seq)
1688
xt_db_set_size(self, &ws->ws_databuf, (size_t) op->or_op_len);
1689
if (!ws->ws_db->db_xlog.xlog_rnd_read(&ws->ws_seqread, op->or_log_id, op->or_log_offset, (size_t) op->or_op_len, ws->ws_databuf.db_data, NULL, self))
1691
check_index = ws->ws_in_recover && xt_comp_log_pos(op->or_log_id, op->or_log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;
1692
xres_apply_change(self, ws, (XTXactLogBufferDPtr) ws->ws_databuf.db_data, in_sequence, check_index, op->or_op_seq);
1693
if (tab->tab_wr_wake_freeer) {
1694
if (!XTTableSeq::xt_op_is_before(tab->tab_wr_op_seq, tab->tab_wake_freeer_op))
1695
xt_wr_wake_freeer(self, ws->ws_db);
1699
xt_sl_remove_from_front(self, tab->tab_op_list, i);
1700
xt_sl_unlock(self, tab->tab_op_list);
1703
/* Check for operations still remaining on tables.
1704
* These operations are applied even though operations
1705
* in sequence are missing.
1707
static xtBool xres_sync_operations(XTThreadPtr self, XTDatabaseHPtr db, XTWriterStatePtr ws)
1710
XTTableEntryPtr te_ptr;
1712
xtBool op_synced = FALSE;
1714
xt_enum_tables_init(&edx);
1715
while ((te_ptr = xt_enum_tables_next(self, db, &edx))) {
1716
/* Dirty read of tab_op_list OK, here because this is the
1717
* only thread that updates the list!
1719
if ((tab = te_ptr->te_table)) {
1720
if (xt_sl_get_size(tab->tab_op_list)) {
1722
if (xres_open_table(self, ws, te_ptr->te_tab_id))
1723
xres_apply_operations(self, ws, FALSE);
1726
/* Update the pointer cache: */
1727
tab->tab_seq.xt_op_seq_set(self, tab->tab_wr_op_seq+1);
1728
tab->tab_row_eof_id = tab->tab_head_row_eof_id;
1729
tab->tab_row_free_id = tab->tab_head_row_free_id;
1730
tab->tab_row_fnum = tab->tab_head_row_fnum;
1731
tab->tab_rec_eof_id = tab->tab_head_rec_eof_id;
1732
tab->tab_rec_free_id = tab->tab_head_rec_free_id;
1733
tab->tab_rec_fnum = tab->tab_head_rec_fnum;
1739
#ifdef XT_CORRECT_TABLE_FREE_COUNT
1740
#define CORRECT_COUNT TRUE
1742
#define CORRECT_COUNT FALSE
1744
#ifdef XT_CHECK_RECORD_FREE_COUNT
1745
#define CHECK_RECS TRUE
1747
#define CHECK_RECS FALSE
1749
#if defined(XT_CHECK_RECORD_FREE_COUNT) || defined(XT_CHECK_ROW_FREE_COUNT)
1750
#define RECOVER_FREE_COUNTS
1753
#ifdef RECOVER_FREE_COUNTS
1754
/* {CORRECTED-ROW-COUNT}
1755
* This error can be repeated by crashing the server during
1756
* high activitity, after flush table writes the table header
1758
* On recovery, the free count "from the future" is used as
1759
* the starting point for subsequent allocation and frees.
1760
* The count is wrong after that point.
1762
* The recovery of the count only works correctly if a
1763
* checkpoint is complete successfully after that table
1764
* header is flushed. Basically the writing of the table
1765
* header should be synchronsized with the writing of the
1766
* end of the checkpoint.
1768
* Another solution would be to log the count, along with
1769
* the allocate and free commannds.
1771
* The 3rd solution is the one used here. The count is corrected
1774
static void xres_recover_table_free_counts(XTThreadPtr self, XTDatabaseHPtr db, XTWriterStatePtr ws)
1777
XTTableEntryPtr te_ptr;
1780
xt_enum_tables_init(&edx);
1781
while ((te_ptr = xt_enum_tables_next(self, db, &edx))) {
1782
if ((tab = te_ptr->te_table)) {
1783
if (xres_open_table(self, ws, te_ptr->te_tab_id))
1784
xt_tab_check_free_lists(self, ws->ws_ot, CHECK_RECS, CORRECT_COUNT);
1791
* Operations from the log are applied in sequence order.
1792
* If the operations are out of sequence, they are buffered
1793
* until the missing operations appear.
1795
* NOTE: No lock is required because there should only be
1796
* one thread that does this!
1798
xtPublic void xt_xres_apply_in_order(XTThreadPtr self, XTWriterStatePtr ws, xtLogID log_id, xtLogOffset log_offset, XTXactLogBufferDPtr record)
1805
// XTDatabaseHPtr db, XTOpenTablePtr *ot, XTXactSeqReadPtr sr, XTDataBufferPtr databuf
1806
switch (record->xl.xl_status_1) {
1807
case XT_LOG_ENT_REC_MODIFIED:
1808
case XT_LOG_ENT_UPDATE:
1809
case XT_LOG_ENT_INSERT:
1810
case XT_LOG_ENT_DELETE:
1811
case XT_LOG_ENT_UPDATE_BG:
1812
case XT_LOG_ENT_INSERT_BG:
1813
case XT_LOG_ENT_DELETE_BG:
1814
len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1) + (size_t) XT_GET_DISK_2(record->xu.xu_size_2);
1815
op_seq = XT_GET_DISK_4(record->xu.xu_op_seq_4);
1816
tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
1818
case XT_LOG_ENT_UPDATE_FL:
1819
case XT_LOG_ENT_INSERT_FL:
1820
case XT_LOG_ENT_DELETE_FL:
1821
case XT_LOG_ENT_UPDATE_FL_BG:
1822
case XT_LOG_ENT_INSERT_FL_BG:
1823
case XT_LOG_ENT_DELETE_FL_BG:
1824
len = offsetof(XTactUpdateFLEntryDRec, xf_rec_type_1) + (size_t) XT_GET_DISK_2(record->xf.xf_size_2);
1825
op_seq = XT_GET_DISK_4(record->xf.xf_op_seq_4);
1826
tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
1828
case XT_DEFUNKT_REC_FREED:
1829
case XT_DEFUNKT_REC_REMOVED:
1830
case XT_DEFUNKT_REC_REMOVED_EXT:
1831
/* [(7)] REMOVE is now a extended version of FREE! */
1832
len = offsetof(XTactFreeRecEntryDRec, fr_rec_type_1) + sizeof(XTTabRecFreeDRec);
1833
goto fixed_len_data;
1834
case XT_LOG_ENT_REC_REMOVED_BI:
1835
len = offsetof(XTactRemoveBIEntryDRec, rb_rec_type_1) + (size_t) XT_GET_DISK_2(record->rb.rb_size_2);
1836
op_seq = XT_GET_DISK_4(record->rb.rb_op_seq_4);
1837
tab_id = XT_GET_DISK_4(record->rb.rb_tab_id_4);
1839
case XT_LOG_ENT_REC_REMOVED_BI_L:
1840
len = offsetof(XTactRemoveBILEntryDRec, bl_rec_type_1) + (size_t) XT_GET_DISK_2(record->bl.bl_size_2);
1841
op_seq = XT_GET_DISK_4(record->bl.bl_op_seq_4);
1842
tab_id = XT_GET_DISK_4(record->bl.bl_tab_id_4);
1844
case XT_LOG_ENT_REC_MOVED:
1845
len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 8;
1846
goto fixed_len_data;
1847
case XT_LOG_ENT_REC_CLEANED:
1848
len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1849
goto fixed_len_data;
1850
case XT_LOG_ENT_REC_CLEANED_1:
1851
len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + 1;
1852
goto fixed_len_data;
1853
case XT_LOG_ENT_REC_UNLINKED:
1854
len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1) + offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE;
1856
op_seq = XT_GET_DISK_4(record->xw.xw_op_seq_4);
1857
tab_id = XT_GET_DISK_4(record->xw.xw_tab_id_4);
1859
case XT_LOG_ENT_ROW_NEW:
1860
len = sizeof(XTactRowAddedEntryDRec) - 4;
1862
case XT_LOG_ENT_ROW_NEW_FL:
1863
len = sizeof(XTactRowAddedEntryDRec);
1865
op_seq = XT_GET_DISK_4(record->xa.xa_op_seq_4);
1866
tab_id = XT_GET_DISK_4(record->xa.xa_tab_id_4);
1868
case XT_LOG_ENT_ROW_ADD_REC:
1869
case XT_LOG_ENT_ROW_SET:
1870
case XT_LOG_ENT_ROW_FREED:
1871
len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4) + sizeof(XTTabRowRefDRec);
1872
op_seq = XT_GET_DISK_4(record->wr.wr_op_seq_4);
1873
tab_id = XT_GET_DISK_4(record->wr.wr_tab_id_4);
1875
case XT_LOG_ENT_NO_OP:
1876
case XT_LOG_ENT_END_OF_LOG:
1882
if (!xres_open_table(self, ws, tab_id))
1885
XTTableHPtr tab = ws->ws_ot->ot_table;
1889
* During normal operation this is actually given.
1891
* During recovery, it only applies to the record/row files
1892
* The index file is flushed indepently, and changes may
1893
* have been applied to the index (due to a call to flush index,
1894
* which comes as a result of out of memory) that have not been
1895
* applied to the record/row files.
1897
* As a result we need to do the index checks that apply to this
1900
* At the moment, I will just do everything, which should not
1903
* This error can be repeated by running the test
1904
* runTest(OUT_OF_CACHE_UPDATE_TEST, 32, OUT_OF_CACHE_UPDATE_TEST_UPDATE_COUNT, OUT_OF_CACHE_UPDATE_TEST_SET_SIZE)
1905
* and crashing after a while.
1907
* Do this by setting not_this to NULL. This will cause the test to
1908
* hang after a while. After a restart the indexes are corrupt if the
1909
* ws->ws_in_recover condition is not present here.
1911
if (ws->ws_in_recover) {
1912
if (!tab->tab_op_seq_set) {
1913
/* op_seq <= tab_wr_op_seq + 1: */
1914
ASSERT(XTTableSeq::xt_op_is_before(op_seq, tab->tab_wr_op_seq+2));
1915
if (XTTableSeq::xt_op_is_before(op_seq-1, tab->tab_wr_op_seq))
1916
/* Adjust the operation sequence number: */
1917
tab->tab_wr_op_seq = op_seq-1;
1918
tab->tab_head_op_seq = tab->tab_wr_op_seq;
1919
tab->tab_op_seq_set = TRUE;
1923
if (!XTTableSeq::xt_op_is_before(tab->tab_wr_op_seq, op_seq))
1926
if (tab->tab_wr_op_seq+1 == op_seq) {
1927
/* I could use tab_ind_rec_log_id, but this may be a problem, if
1928
* recovery does not recover up to the last committed transaction.
1930
check_index = ws->ws_in_recover && xt_comp_log_pos(log_id, log_offset, ws->ws_ind_rec_log_id, ws->ws_ind_rec_log_offset) >= 0;
1931
xres_apply_change(self, ws, record, TRUE, check_index, op_seq);
1932
if (tab->tab_wr_wake_freeer) {
1933
if (!XTTableSeq::xt_op_is_before(tab->tab_wr_op_seq, tab->tab_wake_freeer_op))
1934
xt_wr_wake_freeer(self, ws->ws_db);
1937
/* Apply any operations in the list that now follow on...
1938
* NOTE: the tab_op_list only has be locked for modification.
1939
* This is because only one thread ever changes the list
1940
* (on startup and the writer), but the checkpoint thread
1944
if ((op = (XTOperationPtr) xt_sl_first_item(tab->tab_op_list))) {
1945
if (tab->tab_wr_op_seq+1 == op->or_op_seq) {
1946
xres_apply_operations(self, ws, TRUE);
1951
/* Add the operation to the list: */
1954
op.or_op_seq = op_seq;
1956
op.or_log_id = log_id;
1957
op.or_log_offset = log_offset;
1958
xt_sl_lock(self, tab->tab_op_list);
1959
xt_sl_insert(self, tab->tab_op_list, &op_seq, &op);
1961
//ASSERT(tab->tab_op_list->sl_usage_count < 1000000);
1962
xt_sl_unlock(self, tab->tab_op_list);
1966
/* ----------------------------------------------------------------------
1967
* CHECKPOINTING FUNCTIONALITY
1970
static xtBool xres_delete_data_log(XTDatabaseHPtr db, xtLogID log_id)
1972
XTDataLogFilePtr data_log;
1973
char path[PATH_MAX];
1975
db->db_datalogs.dlc_name(PATH_MAX, path, log_id);
1977
if (!db->db_datalogs.dlc_remove_data_log(log_id, TRUE))
1980
if (xt_fs_exists(path)) {
1981
#ifdef DEBUG_LOG_DELETE
1982
printf("-- delete log: %s\n", path);
1984
if (!xt_fs_delete(NULL, path))
1987
/* The log was deleted: */
1988
if (!db->db_datalogs.dlc_get_data_log(&data_log, log_id, TRUE, NULL))
1991
if (!db->db_datalogs.dls_set_log_state(data_log, XT_DL_DELETED))
1997
static int xres_comp_flush_tabs(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
1999
xtTableID tab_id = *((xtTableID *) a);
2000
XTCheckPointTablePtr cp_tab = (XTCheckPointTablePtr) b;
2002
if (tab_id < cp_tab->cpt_tab_id)
2004
if (tab_id > cp_tab->cpt_tab_id)
2009
static void xres_init_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
2011
xt_init_mutex_with_autoname(self, &cp->cp_state_lock);
2012
cp->cp_inited = TRUE;
2015
static void xres_free_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
2017
cp->cp_inited = FALSE;
2018
xt_free_mutex(&cp->cp_state_lock);
2019
if (cp->cp_table_ids) {
2020
xt_free_sortedlist(self, cp->cp_table_ids);
2021
cp->cp_table_ids = NULL;
2026
* Remove the deleted logs so that they can be re-used.
2027
* This is only possible after a checkpoint has been
2028
* written that does _not_ include these logs as logs
2031
static xtBool xres_remove_data_logs(XTDatabaseHPtr db)
2033
u_int no_of_logs = xt_sl_get_size(db->db_datalogs.dlc_deleted);
2034
xtLogID *log_id_ptr;
2036
for (u_int i=0; i<no_of_logs; i++) {
2037
log_id_ptr = (xtLogID *) xt_sl_item_at(db->db_datalogs.dlc_deleted, i);
2038
if (!db->db_datalogs.dlc_remove_data_log(*log_id_ptr, FALSE))
2041
xt_sl_set_size(db->db_datalogs.dlc_deleted, 0);
2045
/* ----------------------------------------------------------------------
2049
xtPublic void xt_xres_init(XTThreadPtr self, XTDatabaseHPtr db)
2053
xt_init_mutex_with_autoname(self, &db->db_cp_lock);
2054
xt_init_cond(self, &db->db_cp_cond);
2055
xt_init_mutex_with_autoname(self, &db->db_fl_lock);
2057
xres_init_checkpoint_state(self, &db->db_cp_state);
2058
db->db_restart.xres_init(self, db, &db->db_wr_log_id, &db->db_wr_log_offset, &max_log_id);
2060
/* It is also the position where transactions will start writing the
2063
if (!db->db_xlog.xlog_set_write_offset(db->db_wr_log_id, db->db_wr_log_offset, max_log_id, self))
2067
xtPublic void xt_xres_exit(XTThreadPtr self, XTDatabaseHPtr db)
2069
db->db_restart.xres_exit(self);
2070
xres_free_checkpoint_state(self, &db->db_cp_state);
2071
xt_free_mutex(&db->db_cp_lock);
2072
xt_free_cond(&db->db_cp_cond);
2073
xt_free_mutex(&db->db_fl_lock);
2076
/* ----------------------------------------------------------------------
2077
* RESTART FUNCTIONALITY
2081
* Restart the database. This function loads the restart position, and
2082
* applies all changes in the logs, until the end of the log, or
2083
* a corrupted record is found.
2085
* The restart position is the position in the log where we know that
2086
* all the changes up to that point have been flushed to the
2089
* This is called the checkpoint position. The checkpoint position
2090
* is written alternatively to 2 restart files.
2092
* To make a checkpoint:
2093
* Get the current log writer log offset.
2095
* Get the log offset of the next operation on the table, if an
2096
* operation is queued for the table.
2097
* Flush that table, and the operation sequence to the table.
2098
* For each unclean transaction:
2099
* Get the log offset of the begin of the transaction.
2100
* Write the lowest of all log offsets to the restart file!
2103
void XTXactRestart::xres_init(XTThreadPtr self, XTDatabaseHPtr db, xtLogID *log_id, xtLogOffset *log_offset, xtLogID *max_log_id)
2105
char path[PATH_MAX];
2106
XTOpenFilePtr of = NULL;
2107
XTXlogCheckpointDPtr res_1_buffer = NULL;
2108
XTXlogCheckpointDPtr res_2_buffer = NULL;
2109
XTXlogCheckpointDPtr use_buffer;
2110
xtLogID ind_rec_log_id = 0;
2111
xtLogOffset ind_rec_log_offset = 0;
2116
ASSERT(!self->st_database);
2117
/* The following call stack:
2118
* XTDatabaseLog::xlog_flush_pending()
2119
* XTDatabaseLog::xlog_flush()
2120
* xt_xlog_flush_log()
2121
* xt_flush_indices()
2122
* idx_out_of_memory_failure()
2124
* xres_remove_index_entries()
2125
* xres_apply_change()
2126
* xt_xres_apply_in_order()
2127
* XTXactRestart::xres_restart()
2128
* XTXactRestart::xres_init()
2129
* Leads to st_database being used!
2131
self->st_database = db;
2133
#ifdef SKIP_STARTUP_CHECKPOINT
2134
/* When debugging, we do not checkpoint immediately, just in case
2135
* we detect a problem during recovery.
2137
xres_cp_required = FALSE;
2139
xres_cp_required = TRUE;
2144
/* Figure out which restart file to use.
2146
xres_name(PATH_MAX, path, 1);
2147
if ((of = xt_open_file(self, path, XT_FT_STANDARD, XT_FS_MISSING_OK, 16*1024*1024))) {
2150
res_1_size = (size_t) xt_seek_eof_file(self, of);
2151
res_1_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_1_size);
2152
if (!xt_pread_file(of, 0, res_1_size, res_1_size, res_1_buffer, NULL, &self->st_statistics.st_x, self))
2154
xt_close_file(self, of);
2156
if (!xres_check_checksum(res_1_buffer, res_1_size)) {
2157
xt_free(self, res_1_buffer);
2158
res_1_buffer = NULL;
2162
xres_name(PATH_MAX, path, 2);
2163
if ((of = xt_open_file(self, path, XT_FT_STANDARD, XT_FS_MISSING_OK, 16*1024*1024))) {
2166
res_2_size = (size_t) xt_seek_eof_file(self, of);
2167
res_2_buffer = (XTXlogCheckpointDPtr) xt_malloc(self, res_2_size);
2168
if (!xt_pread_file(of, 0, res_2_size, res_2_size, res_2_buffer, NULL, &self->st_statistics.st_x, self))
2170
xt_close_file(self, of);
2172
if (!xres_check_checksum(res_2_buffer, res_2_size)) {
2173
xt_free(self, res_2_buffer);
2174
res_2_buffer = NULL;
2178
if (res_1_buffer && res_2_buffer) {
2179
if (xt_comp_log_pos(
2180
XT_GET_DISK_4(res_1_buffer->xcp_log_id_4),
2181
XT_GET_DISK_6(res_1_buffer->xcp_log_offs_6),
2182
XT_GET_DISK_4(res_2_buffer->xcp_log_id_4),
2183
XT_GET_DISK_6(res_2_buffer->xcp_log_offs_6)) > 0) {
2184
/* The first log is the further along than the second: */
2185
xt_free(self, res_2_buffer);
2186
res_2_buffer = NULL;
2189
if (XT_GET_DISK_6(res_1_buffer->xcp_chkpnt_no_6) >
2190
XT_GET_DISK_6(res_2_buffer->xcp_chkpnt_no_6)) {
2191
xt_free(self, res_2_buffer);
2192
res_2_buffer = NULL;
2195
xt_free(self, res_1_buffer);
2196
res_1_buffer = NULL;
2202
use_buffer = res_1_buffer;
2203
xres_next_res_no = 2;
2206
use_buffer = res_2_buffer;
2207
xres_next_res_no = 1;
2210
/* Read the checkpoint data: */
2214
xtTableID xt_tab_id;
2216
xres_cp_number = XT_GET_DISK_6(use_buffer->xcp_chkpnt_no_6);
2217
xres_cp_log_id = XT_GET_DISK_4(use_buffer->xcp_log_id_4);
2218
xres_cp_log_offset = XT_GET_DISK_6(use_buffer->xcp_log_offs_6);
2219
xt_tab_id = XT_GET_DISK_4(use_buffer->xcp_tab_id_4);
2220
if (xt_tab_id > db->db_curr_tab_id)
2221
db->db_curr_tab_id = xt_tab_id;
2222
db->db_xn_curr_id = XT_GET_DISK_4(use_buffer->xcp_xact_id_4);
2223
ind_rec_log_id = XT_GET_DISK_4(use_buffer->xcp_ind_rec_log_id_4);
2224
ind_rec_log_offset = XT_GET_DISK_6(use_buffer->xcp_ind_rec_log_offs_6);
2225
no_of_logs = XT_GET_DISK_2(use_buffer->xcp_log_count_2);
2228
printf("CHECKPOINT log=%d offset=%d ", (int) xres_cp_log_id, (int) xres_cp_log_offset);
2230
printf("DELETED LOGS: ");
2233
/* Logs that are deleted are locked until _after_ the next
2236
* To prevent the following problem from occuring:
2237
* - Recovery is performed, and log X is deleted
2238
* - After delete a log is free for re-use.
2239
* New data is writen to log X.
2241
* - Recovery is performed from previous checkpoint,
2242
* and log X is deleted again.
2244
* To lock the logs the are placed on the deleted list.
2245
* After the next checkpoint, all logs on this list
2248
for (u_int i=0; i<no_of_logs; i++) {
2249
xt_log_id = (xtLogID) XT_GET_DISK_2(use_buffer->xcp_del_log[i]);
2253
printf("%d", (int) xt_log_id);
2255
#ifdef DEBUG_KEEP_LOGS
2256
xt_dl_set_to_delete(self, db, xt_log_id);
2258
if (!xres_delete_data_log(db, xt_log_id))
2268
/* Try to determine the correct start point. */
2270
xres_cp_log_id = xt_xlog_get_min_log(self, db);
2271
xres_cp_log_offset = 0;
2272
ind_rec_log_id = xres_cp_log_id;
2273
ind_rec_log_offset = xres_cp_log_offset;
2276
printf("CHECKPOINT log=1 offset=0\n");
2281
xt_free(self, res_1_buffer);
2282
res_1_buffer = NULL;
2285
xt_free(self, res_2_buffer);
2286
res_2_buffer = NULL;
2289
if (!xres_restart(self, log_id, log_offset, ind_rec_log_id, ind_rec_log_offset, max_log_id))
2295
xt_enter_exception_handler(self, &e);
2296
self->st_database = NULL;
2298
xt_close_file_ns(of);
2300
xt_free_ns(res_1_buffer);
2302
xt_free_ns(res_2_buffer);
2304
xt_exit_exception_handler(self, &e);
2308
self->st_database = NULL;
2313
void XTXactRestart::xres_exit(XTThreadPtr XT_UNUSED(self))
2317
void XTXactRestart::xres_name(size_t size, char *path, xtLogID log_id)
2321
sprintf(name, "restart-%lu.xt", (u_long) log_id);
2322
xt_strcpy(size, path, xres_db->db_main_path);
2323
xt_add_system_dir(size, path);
2324
xt_add_dir_char(size, path);
2325
xt_strcat(size, path, name);
2328
xtBool XTXactRestart::xres_check_checksum(XTXlogCheckpointDPtr buffer, size_t size)
2332
/* The minimum size: */
2333
if (size < offsetof(XTXlogCheckpointDRec, xcp_head_size_4) + 4)
2336
/* Check the sizes: */
2337
head_size = XT_GET_DISK_4(buffer->xcp_head_size_4);
2338
if (size < head_size)
2341
if (XT_GET_DISK_2(buffer->xcp_checksum_2) != xt_get_checksum(((xtWord1 *) buffer) + 2, size - 2, 1))
2344
if (XT_GET_DISK_2(buffer->xcp_version_2) != XT_CHECKPOINT_VERSION)
2350
void XTXactRestart::xres_recover_progress(XTThreadPtr self, XTOpenFilePtr *of, int perc)
2352
#ifdef XT_USE_GLOBAL_DB
2354
char file_path[PATH_MAX];
2357
xt_close_file(self, *of);
2360
xt_strcpy(PATH_MAX, file_path, xres_db->db_main_path);
2361
xt_add_pbxt_file(PATH_MAX, file_path, "recovery-progress");
2362
if (xt_fs_exists(file_path))
2363
xt_fs_delete(self, file_path);
2369
char file_path[PATH_MAX];
2371
xt_strcpy(PATH_MAX, file_path, xres_db->db_main_path);
2372
xt_add_pbxt_file(PATH_MAX, file_path, "recovery-progress");
2373
*of = xt_open_file(self, file_path, XT_FT_STANDARD, XT_FS_CREATE | XT_FS_MAKE_PATH, 16*1024*1024);
2374
xt_set_eof_file(self, *of, 0);
2377
sprintf(number, "%d", perc);
2378
if (!xt_pwrite_file(*of, 0, strlen(number), number, &self->st_statistics.st_x, self))
2380
if (!xt_flush_file(*of, &self->st_statistics.st_x, self))
2386
xtBool XTXactRestart::xres_restart(XTThreadPtr self, xtLogID *log_id, xtLogOffset *log_offset, xtLogID ind_rec_log_id, xtLogOffset ind_rec_log_offset, xtLogID *max_log_id)
2389
XTDatabaseHPtr db = xres_db;
2390
XTXactLogBufferDPtr record;
2394
XTWriterStateRec ws;
2395
off_t bytes_read = 0;
2396
off_t bytes_to_read;
2397
volatile xtBool print_progress = FALSE;
2398
volatile off_t perc_size = 0, next_goal = 0;
2399
int perc_complete = 1, perc_to_write = 1;
2400
XTOpenFilePtr progress_file = NULL;
2401
xtBool min_ram_xn_id_set = FALSE;
2405
memset(&ws, 0, sizeof(ws));
2408
ws.ws_in_recover = TRUE;
2409
ws.ws_ind_rec_log_id = ind_rec_log_id;
2410
ws.ws_ind_rec_log_offset = ind_rec_log_offset;
2412
/* Initialize the data log buffer (required if extended data is
2414
* Note: this buffer is freed later. It is part of the thread
2415
* "open database" state, and this means that a thread
2416
* may not have another database open (in use) when
2417
* it calls this functions.
2419
self->st_dlog_buf.dlb_init(db, xt_db_log_buffer_size);
2421
if (!db->db_xlog.xlog_seq_init(&ws.ws_seqread, xt_db_log_buffer_size, TRUE))
2424
bytes_to_read = xres_bytes_to_read(self, db, &log_count, max_log_id);
2425
/* Don't print anything about recovering an empty database: */
2426
if (bytes_to_read != 0)
2427
xt_logf(XT_NT_INFO, "PBXT: Recovering from %lu-%llu, bytes to read: %llu\n", (u_long) xres_cp_log_id, (u_llong) xres_cp_log_offset, (u_llong) bytes_to_read);
2429
print_progress = FALSE;
2430
start_time = time(NULL);
2431
perc_size = bytes_to_read / 100;
2432
next_goal = perc_size;
2434
if (!db->db_xlog.xlog_seq_start(&ws.ws_seqread, xres_cp_log_id, xres_cp_log_offset, FALSE)) {
2441
if (!db->db_xlog.xlog_seq_next(&ws.ws_seqread, &record, TRUE, self)) {
2445
/* Increment before. If record is NULL then xseq_record_len will be zero,
2446
* UNLESS the last record was of type XT_LOG_ENT_END_OF_LOG
2447
* which fills the log to align to block of size 512.
2449
bytes_read += ws.ws_seqread.xseq_record_len;
2452
#ifdef PRINT_LOG_ON_RECOVERY
2453
xt_print_log_record(ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
2455
if (next_goal && bytes_read >= next_goal) {
2456
while (bytes_read >= next_goal) {
2457
next_goal += perc_size;
2460
if (!print_progress) {
2461
if (time(NULL) - start_time > 2)
2462
print_progress = TRUE;
2464
if (print_progress) {
2465
while (perc_to_write < perc_complete) {
2466
if (((perc_to_write - 1) % 25) == 0)
2467
xt_logf(XT_NT_INFO, "PBXT: ");
2468
if ((perc_to_write % 25) == 0)
2469
xt_logf(XT_NT_INFO, "%2d\n", (int) perc_to_write);
2471
xt_logf(XT_NT_INFO, "%2d ", (int) perc_to_write);
2473
xres_recover_progress(self, &progress_file, perc_to_write);
2478
switch (record->xl.xl_status_1) {
2479
case XT_LOG_ENT_HEADER:
2481
case XT_LOG_ENT_NEW_LOG: {
2482
/* Adjust the bytes read for the fact that logs are written
2483
* on 512 byte boundaries.
2485
off_t offs, eof = ws.ws_seqread.xseq_log_eof;
2487
offs = ws.ws_seqread.xseq_rec_log_offset + ws.ws_seqread.xseq_record_len;
2489
bytes_read += eof - offs;
2490
if (!db->db_xlog.xlog_seq_start(&ws.ws_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, TRUE))
2494
case XT_LOG_ENT_NEW_TAB:
2495
tab_id = XT_GET_DISK_4(record->xt.xt_tab_id_4);
2496
if (tab_id > db->db_curr_tab_id)
2497
db->db_curr_tab_id = tab_id;
2499
case XT_LOG_ENT_UPDATE_BG:
2500
case XT_LOG_ENT_INSERT_BG:
2501
case XT_LOG_ENT_DELETE_BG:
2502
xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
2504
case XT_LOG_ENT_UPDATE_FL_BG:
2505
case XT_LOG_ENT_INSERT_FL_BG:
2506
case XT_LOG_ENT_DELETE_FL_BG:
2507
xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
2509
if (xt_xn_is_before(db->db_xn_curr_id, xn_id))
2510
db->db_xn_curr_id = xn_id;
2512
if (!(xact = xt_xn_add_old_xact(db, xn_id, self)))
2515
xact->xd_begin_log = ws.ws_seqread.xseq_rec_log_id;
2516
xact->xd_begin_offset = ws.ws_seqread.xseq_rec_log_offset;
2518
xact->xd_end_xn_id = xn_id;
2519
xact->xd_end_time = db->db_xn_end_time;
2520
xact->xd_flags = (XT_XN_XAC_LOGGED | XT_XN_XAC_ENDED | XT_XN_XAC_RECOVERED | XT_XN_XAC_SWEEP);
2522
/* This may affect the "minimum RAM transaction": */
2523
if (!min_ram_xn_id_set || xt_xn_is_before(xn_id, db->db_xn_min_ram_id)) {
2524
min_ram_xn_id_set = TRUE;
2525
db->db_xn_min_ram_id = xn_id;
2527
xt_xres_apply_in_order(self, &ws, ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
2529
case XT_LOG_ENT_COMMIT:
2530
case XT_LOG_ENT_ABORT:
2531
xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
2532
if ((xact = xt_xn_get_xact(db, xn_id, self))) {
2533
xact->xd_end_xn_id = xn_id;
2534
xact->xd_flags |= XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
2535
xact->xd_flags &= ~XT_XN_XAC_RECOVERED; // We can expect an end record on cleanup!
2536
xact->xd_flags &= ~XT_XN_XAC_PREPARED; // Prepared transactions cannot be swept!
2537
if (record->xl.xl_status_1 == XT_LOG_ENT_COMMIT)
2538
xact->xd_flags |= XT_XN_XAC_COMMITTED;
2539
if (xt_sl_get_size(db->db_xn_xa_list) > 0)
2540
xt_xn_delete_xa_data_by_xact(db, xn_id, self);
2543
case XT_LOG_ENT_CLEANUP:
2544
/* The transaction was cleaned up: */
2545
xn_id = XT_GET_DISK_4(record->xc.xc_xact_id_4);
2546
xt_xn_delete_xact(db, xn_id, self);
2548
case XT_LOG_ENT_OP_SYNC:
2549
xres_sync_operations(self, db, &ws);
2551
case XT_LOG_ENT_DEL_LOG:
2554
rec_log_id = XT_GET_DISK_4(record->xl.xl_log_id_4);
2555
xt_dl_set_to_delete(self, db, rec_log_id);
2557
case XT_LOG_ENT_PREPARE:
2558
xn_id = XT_GET_DISK_4(record->xp.xp_xact_id_4);
2559
if ((xact = xt_xn_get_xact(db, xn_id, self))) {
2560
xact->xd_flags |= XT_XN_XAC_PREPARED;
2561
if (!xt_xn_store_xa_data(db, xn_id, record->xp.xp_xa_len_1, record->xp.xp_xa_data, self))
2566
xt_xres_apply_in_order(self, &ws, ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
2571
if (xres_sync_operations(self, db, &ws)) {
2572
XTactOpSyncEntryDRec op_sync;
2573
time_t now = time(NULL);
2575
op_sync.os_status_1 = XT_LOG_ENT_OP_SYNC;
2576
op_sync.os_checksum_1 = XT_CHECKSUM_1(now) ^ XT_CHECKSUM_1(ws.ws_seqread.xseq_rec_log_id);
2577
XT_SET_DISK_4(op_sync.os_time_4, (xtWord4) now);
2578
/* TODO: If this is done, check to see that
2579
* the byte written here are read back by the writter.
2580
* This is in order to be in sync with 'xl_log_bytes_written'.
2581
* i.e. xl_log_bytes_written == xl_log_bytes_read
2583
if (!db->db_xlog.xlog_write_thru(&ws.ws_seqread, sizeof(XTactOpSyncEntryDRec), (xtWord1 *) &op_sync, self))
2587
#ifdef XT_SORT_REC_WRITES
2588
/* Flush all tables where we still have cached writes: */
2589
xt_xres_flush_all(self, &ws);
2598
if (print_progress) {
2599
while (perc_complete <= 100) {
2600
if (((perc_complete - 1) % 25) == 0)
2601
xt_logf(XT_NT_INFO, "PBXT: ");
2602
if ((perc_complete % 25) == 0)
2603
xt_logf(XT_NT_INFO, "%2d\n", (int) perc_complete);
2605
xt_logf(XT_NT_INFO, "%2d ", (int) perc_complete);
2607
xres_recover_progress(self, &progress_file, perc_complete);
2611
if (bytes_to_read != 0)
2612
xt_logf(XT_NT_INFO, "PBXT: Recovery complete at %lu-%llu, bytes read: %llu\n", (u_long) ws.ws_seqread.xseq_rec_log_id, (u_llong) ws.ws_seqread.xseq_rec_log_offset, (u_llong) bytes_read);
2614
*log_id = ws.ws_seqread.xseq_rec_log_id;
2615
*log_offset = ws.ws_seqread.xseq_rec_log_offset;
2617
if (!min_ram_xn_id_set)
2618
/* This is true because if no transaction was placed in RAM then
2619
* the next transaction in RAM will have the next ID: */
2620
db->db_xn_min_ram_id = db->db_xn_curr_id + 1;
2622
#ifdef RECOVER_FREE_COUNTS
2623
if (xres_cp_log_id != *log_id || xres_cp_log_offset != *log_offset) {
2624
/* Recovery took place, correct the row count! */
2625
xres_recover_table_free_counts(self, db, &ws);
2631
xt_free_writer_state(self, &ws);
2632
self->st_dlog_buf.dlb_exit(self);
2633
xres_recover_progress(self, &progress_file, 101);
2637
xtBool XTXactRestart::xres_is_checkpoint_pending(xtLogID curr_log_id, xtLogOffset curr_log_offset)
2639
return xt_bytes_since_last_checkpoint(xres_db, curr_log_id, curr_log_offset) >= xt_db_checkpoint_frequency;
2643
* Calculate the bytes to be read for recovery.
2644
* This is only an estimate of the number of bytes that
2647
off_t XTXactRestart::xres_bytes_to_read(XTThreadPtr self, XTDatabaseHPtr db, u_int *log_count, xtLogID *max_log_id)
2649
off_t to_read = 0, eof;
2650
xtLogID log_id = xres_cp_log_id;
2651
char log_path[PATH_MAX];
2653
XTXactLogHeaderDRec log_head;
2654
xtWord1 end_head[4];
2658
*max_log_id = log_id;
2661
db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
2663
if (!xt_open_file_ns(&of, log_path, XT_FT_STANDARD, XT_FS_MISSING_OK, 16*1024*1024))
2667
pushr_(xt_close_file, of);
2669
/* Check the first record of the log, to see if it is valid. */
2670
if (!xt_pread_file(of, 0, sizeof(XTXactLogHeaderDRec), 0, (xtWord1 *) &log_head, &red_size, &self->st_statistics.st_xlog, self))
2672
/* The minimum size (old log size): */
2673
if (red_size < XT_MIN_LOG_HEAD_SIZE)
2674
goto header_corrupt;
2675
head_size = XT_GET_DISK_4(log_head.xh_size_4);
2676
if (log_head.xh_status_1 != XT_LOG_ENT_HEADER)
2677
goto header_corrupt;
2678
if (log_head.xh_checksum_1 != XT_CHECKSUM_1(log_id))
2679
goto header_corrupt;
2680
if (!xt_pread_file(of, head_size - 4, 4, 0, end_head, &red_size, &self->st_statistics.st_xlog, self))
2682
if (red_size != 4 || XT_GET_DISK_4(end_head) != XT_LOG_FILE_MAGIC)
2683
goto header_corrupt;
2684
if (head_size > offsetof(XTXactLogHeaderDRec, xh_log_id_4) + 4) {
2685
if (XT_GET_DISK_4(log_head.xh_log_id_4) != log_id)
2686
goto header_corrupt;
2688
if (head_size > offsetof(XTXactLogHeaderDRec, xh_version_2) + 2) {
2689
if (XT_GET_DISK_2(log_head.xh_version_2) > XT_LOG_VERSION_NO)
2690
xt_throw_ulxterr(XT_CONTEXT, XT_ERR_NEW_TYPE_OF_XLOG, (u_long) log_id);
2693
eof = xt_seek_eof_file(self, of);
2694
freer_(); // xt_close_file(of)
2695
if (log_id == xres_cp_log_id)
2696
to_read += (eof - xres_cp_log_offset);
2700
*max_log_id = log_id;
2706
if (log_id == xres_cp_log_id && xres_cp_log_offset > (xtLogOffset) sizeof(XTXactLogHeaderDRec))
2707
xt_throw_ulxterr(XT_CONTEXT, XT_ERR_LOG_HEADER_CORRUPT, (u_long) log_id);
2709
freer_(); // xt_close_file(of)
2715
/* ----------------------------------------------------------------------
2716
* C H E C K P O I N T P R O C E S S
2719
#ifdef NEVER_CHECKPOINT
2720
xtBool no_checkpoint = TRUE;
2723
#define XT_CHECKPOINT_IF_NO_ACTIVITY 0
2724
#define XT_CHECKPOINT_PAUSE_IF_ACTIVITY 1
2725
#define XT_CHECKPOINT_NO_PAUSE 2
2727
#define XT_ASYNC_THREAD_FLUSH_LIMIT 200
2729
static void xres_cp_async_checkpoint(XTThreadPtr self, XTDatabaseHPtr db, u_int curr_writer_total, xtBool force_checkpoint)
2731
XTTableHPtr tab = NULL;
2734
int start_count = 0;
2736
/* Start a checkpoint (if one is not already running): */
2737
if (!xt_begin_checkpoint(db, FALSE, self))
2740
while (!self->t_quit) {
2741
if (!xres_get_next_to_flush(db, &tab_id, &flush_bit, FALSE, TRUE))
2742
goto end_checkpoint;
2745
if (!xt_wait_for_async_task_results(self))
2752
if (tab->tab_id != tab_id) {
2753
xt_heap_release(self, tab);
2759
if (!(tab = xt_use_table_by_id(self, db, tab_id, NULL))) {
2760
xt_throw_ulxterr(XT_CONTEXT, XT_ERR_TABLE_NOT_FOUND, (u_long) tab_id);
2765
if (flush_bit == XT_CPT_REC_ROW_FLUSHED) {
2766
if (!xt_async_flush_record_row(tab, TRUE, self))
2770
ASSERT(flush_bit == XT_CPT_INDEX_FLUSHED);
2771
if (!xt_async_flush_indices(tab, TRUE, FALSE, self))
2776
if (!force_checkpoint) {
2777
if (curr_writer_total != db->db_xn_total_writer_count)
2778
goto end_checkpoint;
2781
if (start_count >= XT_ASYNC_THREAD_FLUSH_LIMIT) {
2782
if (!xt_wait_for_async_task_results(self))
2790
xt_heap_release(self, tab);
2792
if (!xt_wait_for_async_task_results(self))
2795
if (!xt_end_checkpoint(db, self, NULL))
2802
xt_heap_release(self, tab);
2807
* This function performs table flush, as long as the system is idle.
2809
static void xres_cp_checkpoint(XTThreadPtr self, XTDatabaseHPtr db, u_int curr_writer_total, xtBool force_checkpoint)
2811
XTOpenTablePtr ot = NULL;
2814
off_t bytes_flushed = 0;
2817
#ifdef NEVER_CHECKPOINT
2821
if (force_checkpoint) {
2822
if (db->db_restart.xres_cp_required)
2823
check_type = XT_CHECKPOINT_NO_PAUSE;
2825
check_type = XT_CHECKPOINT_PAUSE_IF_ACTIVITY;
2828
check_type = XT_CHECKPOINT_IF_NO_ACTIVITY;
2830
/* Start a checkpoint: */
2831
if (!xt_begin_checkpoint(db, FALSE, self))
2834
while (!self->t_quit) {
2835
if (!xres_get_next_to_flush(db, &tab_id, &flush_bit, TRUE, FALSE))
2836
goto end_checkpoint;
2839
if (ot->ot_table->tab_id != tab_id) {
2840
xt_db_return_table_to_pool(self, ot);
2846
ot = xt_db_open_pool_table(self, db, tab_id, NULL, TRUE);
2849
if (flush_bit == XT_CPT_REC_ROW_FLUSHED) {
2850
if (!xt_flush_record_row(ot, &bytes_flushed, FALSE)) {
2851
xt_db_return_table_to_pool(self, ot);
2856
ASSERT(flush_bit == XT_CPT_INDEX_FLUSHED);
2857
if (!xt_flush_indices(ot, &bytes_flushed, FALSE, NULL)) {
2858
xt_db_return_table_to_pool(self, ot);
2864
/* The table was not found. Question, should I just complete the checkpoint,
2865
* or should I cause an error?
2867
* At the moment, I print the error and warning, and continue with the checkpoint.
2869
if (self->t_exception.e_xt_err)
2870
xt_log_and_clear_exception(self);
2871
xt_logf(XT_NT_WARNING, "Checkpoint skipping table (ID) %lu: table was not found\n", (u_long) tab_id);
2872
xt_checkpoint_set_flush_state(db, tab_id, XT_CPT_STATE_DONE_ALL);
2875
switch (check_type) {
2876
case XT_CHECKPOINT_IF_NO_ACTIVITY:
2877
if (bytes_flushed > 0 && curr_writer_total != db->db_xn_total_writer_count)
2878
goto end_checkpoint;
2880
case XT_CHECKPOINT_PAUSE_IF_ACTIVITY:
2881
if (bytes_flushed > 2 * 1024 * 1024 && curr_writer_total != db->db_xn_total_writer_count) {
2882
curr_writer_total = db->db_xn_total_writer_count;
2884
xt_sleep_milli_second(400);
2887
case XT_CHECKPOINT_NO_PAUSE:
2894
xt_db_return_table_to_pool(self, ot);
2896
if (!xt_end_checkpoint(db, self, NULL))
2901
/* Wait for the log writer to tell us to do something.
2903
static void xres_cp_wait_for_log_writer(XTThreadPtr self, XTDatabaseHPtr db, u_long milli_secs)
2905
xt_lock_mutex(self, &db->db_cp_lock);
2906
pushr_(xt_unlock_mutex, &db->db_cp_lock);
2908
xt_timed_wait_cond(self, &db->db_cp_cond, &db->db_cp_lock, milli_secs);
2909
freer_(); // xt_unlock_mutex(&db->db_cp_lock)
2912
static void xres_flush_indices(XTThreadPtr self, XTDatabaseHPtr db)
2915
XTTableEntryPtr te_ptr;
2918
if (!xt_db_index_dirty_threshold || xt_get_index_cache_dirty_perc() < xt_db_index_dirty_threshold)
2921
/* Flush all indexes: */
2922
xt_enum_tables_init(&edx);
2923
while ((te_ptr = xt_enum_tables_next(NULL, db, &edx))) {
2924
if ((tab = te_ptr->te_table)) {
2925
if (!xt_async_flush_indices(tab, FALSE, FALSE, self))
2932
* This is the way checkpoint works:
2934
* To write a checkpoint we need to flush all tables in
2937
* Before flushing the first table we get the checkpoint
2940
* After flushing all files we write of the checkpoint
2943
static void xres_cp_main(XTThreadPtr self)
2945
XTDatabaseHPtr db = self->st_database;
2946
u_int curr_writer_total;
2947
time_t now, last_index_flush;
2948
xtBool async = TRUE;
2949
xtXactID sweep_count;
2951
xt_set_low_priority(self);
2953
xt_db_approximate_time = time(NULL);
2954
last_index_flush = xt_db_approximate_time;
2955
while (!self->t_quit) {
2956
/* Wait 2 seconds: */
2957
curr_writer_total = db->db_xn_total_writer_count;
2958
xt_db_approximate_time = time(NULL);
2959
now = xt_db_approximate_time;
2960
while (!self->t_quit && xt_db_approximate_time < now + 2 && !db->db_restart.xres_cp_required) {
2961
xres_cp_wait_for_log_writer(self, db, 400);
2962
xt_db_approximate_time = time(NULL);
2963
xt_db_free_unused_open_tables(self, db);
2964
xt_db_approximate_time = time(NULL);
2965
if (xt_db_approximate_time > last_index_flush + 2) {
2966
last_index_flush = xt_db_approximate_time;
2967
xres_flush_indices(self, db);
2974
/* If there is a long running transaction, then there is actitivity!
2975
* And if the sweeper is not idle, then there is also activity!
2976
* And if the writer is active, then we should wait for it as well!
2978
#ifdef XT_SWEEPER_SORT_XACTS
2979
sweep_count = db->db_xn_curr_id + 1 - db->db_sw_to_add + db->db_sw_list_size;
2981
sweep_count = db->db_xn_curr_id + 1 - db->db_xn_to_clean_id;
2983
if (!db->db_xn_long_running_count &&
2984
curr_writer_total == db->db_xn_total_writer_count &&
2986
db->db_sw_idle == XT_THREAD_IDLE &&
2987
db->db_wr_idle == XT_THREAD_IDLE) {
2988
/* No activity in 2 seconds: */
2990
xres_cp_async_checkpoint(self, db, curr_writer_total, TRUE);
2992
xres_cp_checkpoint(self, db, curr_writer_total, FALSE);
2995
/* There server is busy, check if we need to
2996
* write a checkpoint anyway...
2998
if (db->db_restart.xres_cp_required ||
2999
db->db_restart.xres_is_checkpoint_pending(db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset)) {
3000
/* Flush tables, until the checkpoint is complete. */
3001
if (db->db_restart.xres_is_checkpoint_pending(db->db_wr_log_id, db->db_wr_log_offset)) {
3002
/* Wait until the writer has done enough for the checkpoint: */
3004
xres_cp_async_checkpoint(self, db, curr_writer_total, TRUE);
3006
xres_cp_checkpoint(self, db, curr_writer_total, TRUE);
3009
/* Wake the writer if necessary: */
3010
if (db->db_wr_idle == XT_THREAD_IDLE) {
3011
if (!xt_broadcast_cond_ns(&db->db_wr_cond))
3012
xt_log_and_clear_exception_ns();
3018
if (curr_writer_total == db->db_xn_total_writer_count) {
3019
/* We did a checkpoint, and still, nothing has
3022
* Wait for something to happen:
3025
xtLogOffset log_offset;
3027
xt_db_approximate_time = time(NULL);
3028
while (!self->t_quit && curr_writer_total == db->db_xn_total_writer_count) {
3029
/* The writer position: */
3030
xt_lock_mutex(self, &db->db_wr_lock);
3031
pushr_(xt_unlock_mutex, &db->db_wr_lock);
3032
log_id = db->db_wr_log_id;
3033
log_offset = db->db_wr_log_offset;
3034
freer_(); // xt_unlock_mutex(&db->db_wr_lock)
3036
/* This condition means we could checkpoint: */
3037
if (!(xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
3038
xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
3039
xt_comp_log_pos(log_id, log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0) &&
3040
xt_sl_get_size(db->db_xn_xa_list) == 0)
3043
xres_cp_wait_for_log_writer(self, db, 400);
3044
xt_db_approximate_time = time(NULL);
3045
xt_db_free_unused_open_tables(self, db);
3046
xt_db_approximate_time = time(NULL);
3047
if (xt_db_approximate_time > last_index_flush + 2) {
3048
last_index_flush = xt_db_approximate_time;
3049
xres_flush_indices(self, db);
3056
static void *xres_cp_run_thread(XTThreadPtr self)
3058
XTDatabaseHPtr db = (XTDatabaseHPtr) self->t_data;
3062
if (!(mysql_thread = myxt_create_thread()))
3065
while (!self->t_quit) {
3068
* The garbage collector requires that the database
3069
* is in use because.
3071
xt_use_database(self, db, XT_FOR_CHECKPOINTER);
3073
/* This action is both safe and required (see {BACKGROUND-RELEASE-DB}) */
3074
xt_heap_release(self, self->st_database);
3079
/* This error is "normal"! */
3080
if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
3081
!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
3082
self->t_exception.e_sys_err == SIGTERM))
3083
xt_log_and_clear_exception(self);
3087
/* Avoid releasing the database (done above) */
3088
self->st_database = NULL;
3089
xt_unuse_database(self, self);
3091
/* After an exception, pause before trying again... */
3092
/* Number of seconds */
3094
while (!self->t_quit && count > 0) {
3101
* {MYSQL-THREAD-KILL}
3102
myxt_destroy_thread(mysql_thread, TRUE);
3107
static void xres_cp_free_thread(XTThreadPtr self, void *data)
3109
XTDatabaseHPtr db = (XTDatabaseHPtr) data;
3111
if (db->db_cp_thread) {
3112
xt_lock_mutex(self, &db->db_cp_lock);
3113
pushr_(xt_unlock_mutex, &db->db_cp_lock);
3114
db->db_cp_thread = NULL;
3115
freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3119
/* Start a checkpoint, if none has been started. */
3120
xtPublic xtBool xt_begin_checkpoint(XTDatabaseHPtr db, xtBool have_table_lock, XTThreadPtr thread)
3122
XTCheckPointStatePtr cp = &db->db_cp_state;
3124
xtLogOffset log_offset;
3125
xtLogID ind_rec_log_id;
3126
xtLogOffset ind_rec_log_offset;
3128
XTTableEntryPtr te_ptr;
3131
XTCheckPointTableRec cpt;
3132
XTSortedListPtr tables = NULL;
3134
/* during startup we can get an error before the checkpointer is inited */
3138
/* First check if a checkpoint is already running: */
3139
xt_lock_mutex_ns(&cp->cp_state_lock);
3140
if (cp->cp_running) {
3141
xt_unlock_mutex_ns(&cp->cp_state_lock);
3144
if (cp->cp_table_ids) {
3145
xt_free_sortedlist(NULL, cp->cp_table_ids);
3146
cp->cp_table_ids = NULL;
3148
xt_unlock_mutex_ns(&cp->cp_state_lock);
3150
/* Flush the log before we continue. This is to ensure that
3151
* before we write a checkpoint, that the changes
3152
* done by the sweeper and the compactor, have been
3155
* Note, the sweeper does not flush the log, so this is
3158
* --- I have removed this flush. It is actually just a
3159
* minor optimisation, which pushes the flush position
3162
* Note that the writer position used for the checkpoint
3163
* _will_ be behind the current log flush position.
3165
* This is because the writer cannot apply log changes
3166
* until they are flushed.
3168
/* This is an alternative to the above.
3169
if (!xt_xlog_flush_log(db, self))
3172
xt_lock_mutex_ns(&db->db_wr_lock);
3174
/* The theoretical maximum restart log postion, is the
3175
* position of the writer thread:
3177
log_id = db->db_wr_log_id;
3178
log_offset = db->db_wr_log_offset;
3180
ind_rec_log_id = db->db_xlog.xl_flush_log_id;
3181
ind_rec_log_offset = db->db_xlog.xl_flush_log_offset;
3183
xt_unlock_mutex_ns(&db->db_wr_lock);
3185
/* Go through all the transactions, and find
3186
* the lowest log start position of all the transactions.
3188
for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
3191
seg = &db->db_xn_idx[i];
3192
XT_XACT_READ_LOCK(&seg->xs_tab_lock, self);
3193
for (u_int j=0; j<XT_XN_HASH_TABLE_SIZE; j++) {
3196
xact = seg->xs_table[j];
3198
/* If the transaction is logged, but not cleaned: */
3199
if ((xact->xd_flags & (XT_XN_XAC_LOGGED | XT_XN_XAC_CLEANED)) == XT_XN_XAC_LOGGED) {
3200
if (xt_comp_log_pos(log_id, log_offset, xact->xd_begin_log, xact->xd_begin_offset) > 0) {
3201
log_id = xact->xd_begin_log;
3202
log_offset = xact->xd_begin_offset;
3205
xact = xact->xd_next_xact;
3208
XT_XACT_UNLOCK(&seg->xs_tab_lock, self, FALSE);
3211
#ifdef TRACE_CHECKPOINT
3212
printf("BEGIN CHECKPOINT %d-%llu\n", (int) log_id, (u_llong) log_offset);
3214
/* Go through all tables, and find the lowest log position.
3215
* The log position stored by each table shows the position of
3216
* the next operation that still needs to be applied.
3218
* This comes from the list of operations which are
3219
* queued for the table.
3221
* This function also builds a list of tables!
3224
if (!(tables = xt_new_sortedlist_ns(sizeof(XTCheckPointTableRec), 20, xres_comp_flush_tabs, NULL, NULL)))
3227
xt_enum_tables_init(&edx);
3228
if (!have_table_lock)
3229
xt_ht_lock(NULL, db->db_tables);
3230
while ((te_ptr = xt_enum_tables_next(NULL, db, &edx))) {
3231
if ((tab = te_ptr->te_table)) {
3232
xt_sl_lock_ns(tab->tab_op_list, thread);
3233
if ((op = (XTOperationPtr) xt_sl_first_item(tab->tab_op_list))) {
3234
if (xt_comp_log_pos(log_id, log_offset, op->or_log_id, op->or_log_offset) > 0) {
3235
log_id = op->or_log_id;
3236
log_offset = op->or_log_offset;
3239
xt_sl_unlock(NULL, tab->tab_op_list);
3240
cpt.cpt_flushed = 0;
3241
cpt.cpt_tab_id = tab->tab_id;
3242
#ifdef TRACE_CHECKPOINT
3243
printf("CHECKPOINT will flush: %d %s\n", (int) tab->tab_id, tab->tab_name->ps_path);
3245
if (!xt_sl_insert(NULL, tables, &tab->tab_id, &cpt)) {
3246
if (!have_table_lock)
3247
xt_ht_unlock(NULL, db->db_tables);
3248
xt_free_sortedlist(NULL, tables);
3253
if (!have_table_lock)
3254
xt_ht_unlock(NULL, db->db_tables);
3256
xt_lock_mutex_ns(&cp->cp_state_lock);
3257
/* If there is a table list, then someone was faster than me! */
3258
/* NOTE: log_offset can be zero, and valid!
3259
* So we only check the log_id.
3261
if (!cp->cp_running && log_id) {
3262
cp->cp_running = TRUE;
3263
cp->cp_log_id = log_id;
3264
cp->cp_log_offset = log_offset;
3266
cp->cp_ind_rec_log_id = ind_rec_log_id;
3267
cp->cp_ind_rec_log_offset = ind_rec_log_offset;
3269
cp->cp_flush_count = 0;
3270
cp->cp_next_to_flush = 0;
3271
cp->cp_table_ids = tables;
3274
xt_free_sortedlist(NULL, tables);
3275
xt_unlock_mutex_ns(&cp->cp_state_lock);
3277
/* At this point, log flushing can begin... */
3282
* This function returns TRUE if there is still something to flush according to the current
3285
* If so, it returns the table ID and a bit indicating whether the table data, or the
3286
* index should be flushed.
3288
* Note, the function also checks if an asynchronous flush is in progress. It will not return
3289
* a reference to a flush that is already in progress.
3291
static xtBool xres_get_next_to_flush(XTDatabaseHPtr db, xtTableID *tab_id, int *flush_bit, xtBool skip_busy, xtBool rec_first)
3293
XTCheckPointStatePtr cp = &db->db_cp_state;
3294
XTCheckPointTablePtr cp_tab;
3295
size_t table_count, tab_idx;
3298
xt_lock_mutex_ns(&cp->cp_state_lock);
3299
if (!cp->cp_running || !cp->cp_table_ids)
3300
goto flush_complete;
3302
if (!(table_count = xt_sl_get_size(cp->cp_table_ids)))
3303
goto flush_complete;
3305
ASSERT_NS(cp->cp_flush_count <= table_count);
3306
if (cp->cp_flush_count >= table_count)
3307
goto flush_complete;
3309
/* Try all tables: */
3310
for (size_t i=0; i<table_count; i++) {
3311
if (cp->cp_next_to_flush > table_count*2) {
3312
cp->cp_next_to_flush = 0;
3314
/* Output a dummy no table, to indicate we are wrapping! */
3321
/* First all record/row files are flushed, then
3324
if (cp->cp_next_to_flush < table_count) {
3325
tab_idx = cp->cp_next_to_flush;
3326
flush_index = FALSE;
3329
tab_idx = cp->cp_next_to_flush - table_count;
3334
/* Flush each table, one after the other,
3335
* record/row file followed by index file:
3337
tab_idx = cp->cp_next_to_flush / 2;
3338
flush_index = (cp->cp_next_to_flush % 2) != 0;
3341
cp->cp_next_to_flush++;
3343
if ((cp_tab = (XTCheckPointTablePtr) xt_sl_item_at(cp->cp_table_ids, tab_idx))) {
3344
*tab_id = cp_tab->cpt_tab_id;
3346
if (!(cp_tab->cpt_flushed & XT_CPT_INDEX_FLUSHED)) {
3347
if (!skip_busy || !(cp_tab->cpt_flushed & XT_CPT_INDEX_FLUSHING)) {
3348
*flush_bit = XT_CPT_INDEX_FLUSHED;
3354
if (!(cp_tab->cpt_flushed & XT_CPT_REC_ROW_FLUSHED)) {
3355
if (!skip_busy || !(cp_tab->cpt_flushed & XT_CPT_REC_ROW_FLUSHING)) {
3356
*flush_bit = XT_CPT_REC_ROW_FLUSHED;
3365
xt_unlock_mutex_ns(&cp->cp_state_lock);
3369
xt_unlock_mutex_ns(&cp->cp_state_lock);
3373
xtPublic void xt_checkpoint_set_flush_state(XTDatabaseHPtr db, xtTableID tab_id, int state)
3375
XTCheckPointStatePtr cp = &db->db_cp_state;
3376
XTCheckPointTablePtr cp_tab;
3379
xt_lock_mutex_ns(&cp->cp_state_lock);
3380
if (cp->cp_running) {
3381
cp_tab = (XTCheckPointTablePtr) xt_sl_find(NULL, cp->cp_table_ids, &tab_id);
3384
case XT_CPT_STATE_START_REC_ROW:
3385
cp_tab->cpt_flushed |= XT_CPT_REC_ROW_FLUSHING;
3387
case XT_CPT_STATE_STOP_REC_ROW:
3388
cp_tab->cpt_flushed &= ~XT_CPT_REC_ROW_FLUSHING;
3390
case XT_CPT_STATE_DONE_REC_ROW:
3391
cp_tab->cpt_flushed &= ~XT_CPT_REC_ROW_FLUSHING;
3392
flush_bit = XT_CPT_REC_ROW_FLUSHED;
3394
case XT_CPT_STATE_START_INDEX:
3395
cp_tab->cpt_flushed |= XT_CPT_INDEX_FLUSHING;
3397
case XT_CPT_STATE_STOP_INDEX:
3398
cp_tab->cpt_flushed &= ~XT_CPT_INDEX_FLUSHING;
3400
case XT_CPT_STATE_DONE_INDEX:
3401
cp_tab->cpt_flushed &= ~XT_CPT_INDEX_FLUSHING;
3402
flush_bit = XT_CPT_INDEX_FLUSHED;
3404
case XT_CPT_STATE_DONE_ALL:
3405
cp_tab->cpt_flushed &= ~(XT_CPT_REC_ROW_FLUSHING | XT_CPT_INDEX_FLUSHING);
3406
flush_bit = XT_CPT_REC_ROW_FLUSHED | XT_CPT_INDEX_FLUSHED;
3410
if (flush_bit && ((cp_tab->cpt_flushed & XT_CPT_ALL_FLUSHED) != XT_CPT_ALL_FLUSHED)) {
3411
cp_tab->cpt_flushed |= flush_bit;
3412
if ((cp_tab->cpt_flushed & XT_CPT_ALL_FLUSHED) == XT_CPT_ALL_FLUSHED) {
3413
ASSERT_NS(cp->cp_flush_count < xt_sl_get_size(cp->cp_table_ids));
3414
cp->cp_flush_count++;
3419
xt_unlock_mutex_ns(&cp->cp_state_lock);
3422
/* End a checkpoint, if a checkpoint has been started,
3423
* and all checkpoint tables have been flushed
3425
xtPublic xtBool xt_end_checkpoint(XTDatabaseHPtr db, XTThreadPtr thread, xtBool *checkpoint_done)
3427
XTCheckPointStatePtr cp = &db->db_cp_state;
3428
XTXlogCheckpointDPtr cp_buf = NULL;
3429
char path[PATH_MAX];
3432
size_t chk_size = 0;
3433
u_int no_of_logs = 0;
3435
/* As long as we have outstanding XA transactions, we may not checkpoint! */
3436
if (xt_sl_get_size(db->db_xn_xa_list) > 0) {
3438
printf("Checkpoint must wait\n");
3443
#ifdef NEVER_CHECKPOINT
3446
/* Lock the checkpoint state so that only on thread can do this! */
3447
xt_lock_mutex_ns(&cp->cp_state_lock);
3448
if (!cp->cp_running)
3449
goto checkpoint_complete;
3452
if (cp->cp_table_ids)
3453
table_count = xt_sl_get_size(cp->cp_table_ids);
3454
if (cp->cp_flush_count < table_count) {
3455
/* Checkpoint is not done, yet! */
3456
xt_unlock_mutex_ns(&cp->cp_state_lock);
3457
if (checkpoint_done)
3458
*checkpoint_done = FALSE;
3462
/* Check if anything has changed since the last checkpoint,
3463
* if not, there is no need to write a new checkpoint!
3465
if (xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
3466
xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
3467
xt_comp_log_pos(cp->cp_log_id, cp->cp_log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0) {
3468
/* A checkpoint is required if the size of the deleted
3469
* list is not zero. The reason is, I cannot remove the
3470
* logs from the deleted list BEFORE a checkpoint has been
3471
* done which does NOT include these logs.
3473
* Even though the logs have already been deleted. They
3474
* remain on the deleted list to ensure that they are NOT
3475
* reused during this time, until the next checkpoint.
3477
* This is done because if they are used, then on restart
3478
* they would be deleted!
3480
#ifdef TRACE_CHECKPOINT
3481
printf("--- END CHECKPOINT - no write\n");
3483
goto checkpoint_complete;
3486
#ifdef TRACE_CHECKPOINT
3487
printf("--- END CHECKPOINT - write start point\n");
3489
xt_lock_mutex_ns(&db->db_datalogs.dlc_lock);
3491
no_of_logs = xt_sl_get_size(db->db_datalogs.dlc_to_delete);
3492
chk_size = offsetof(XTXlogCheckpointDRec, xcp_del_log) + no_of_logs * 2;
3493
xtLogID *log_id_ptr;
3495
if (!(cp_buf = (XTXlogCheckpointDPtr) xt_malloc_ns(chk_size))) {
3496
xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
3500
/* Increment the checkpoint number. This value is used if 2 checkpoint have the
3501
* same log number. In this case checkpoints may differ in the log files
3502
* that should be deleted. Here it is important to use the most recent
3505
db->db_restart.xres_cp_number++;
3507
/* Create the checkpoint record: */
3508
XT_SET_DISK_4(cp_buf->xcp_head_size_4, chk_size);
3509
XT_SET_DISK_2(cp_buf->xcp_version_2, XT_CHECKPOINT_VERSION);
3510
XT_SET_DISK_6(cp_buf->xcp_chkpnt_no_6, db->db_restart.xres_cp_number);
3511
XT_SET_DISK_4(cp_buf->xcp_log_id_4, cp->cp_log_id);
3512
XT_SET_DISK_6(cp_buf->xcp_log_offs_6, cp->cp_log_offset);
3513
XT_SET_DISK_4(cp_buf->xcp_tab_id_4, db->db_curr_tab_id);
3514
XT_SET_DISK_4(cp_buf->xcp_xact_id_4, db->db_xn_curr_id);
3515
XT_SET_DISK_4(cp_buf->xcp_ind_rec_log_id_4, cp->cp_ind_rec_log_id);
3516
XT_SET_DISK_6(cp_buf->xcp_ind_rec_log_offs_6, cp->cp_ind_rec_log_offset);
3517
XT_SET_DISK_2(cp_buf->xcp_log_count_2, no_of_logs);
3519
for (u_int i=0; i<no_of_logs; i++) {
3520
log_id_ptr = (xtLogID *) xt_sl_item_at(db->db_datalogs.dlc_to_delete, i);
3521
XT_SET_DISK_2(cp_buf->xcp_del_log[i], (xtWord2) *log_id_ptr);
3524
XT_SET_DISK_2(cp_buf->xcp_checksum_2, xt_get_checksum(((xtWord1 *) cp_buf) + 2, chk_size - 2, 1));
3526
xt_unlock_mutex_ns(&db->db_datalogs.dlc_lock);
3528
/* Write the checkpoint: */
3529
db->db_restart.xres_name(PATH_MAX, path, db->db_restart.xres_next_res_no);
3530
if (!(of = xt_open_file_ns(path, XT_FT_STANDARD, XT_FS_CREATE | XT_FS_MAKE_PATH, 16*1024*1024)))
3533
if (!xt_set_eof_file(NULL, of, 0))
3535
if (!xt_pwrite_file(of, 0, chk_size, (xtWord1 *) cp_buf, &thread->st_statistics.st_x, thread))
3537
if (!xt_flush_file(of, &thread->st_statistics.st_x, thread))
3540
xt_close_file_ns(of);
3542
/* Next time write the other restart file: */
3543
db->db_restart.xres_next_res_no = (db->db_restart.xres_next_res_no % 2) + 1;
3544
db->db_restart.xres_cp_log_id = cp->cp_log_id;
3545
db->db_restart.xres_cp_log_offset = cp->cp_log_offset;
3546
db->db_restart.xres_cp_required = FALSE;
3549
* Remove all the data logs that were deleted on the
3552
if (!xres_remove_data_logs(db))
3555
#ifndef DEBUG_KEEP_LOGS
3556
/* After checkpoint, we can delete transaction logs that will no longer be required
3559
if (cp->cp_log_id > 1) {
3560
xtLogID current_log_id = cp->cp_log_id;
3563
#ifdef XT_NUMBER_OF_LOGS_TO_SAVE
3564
if (pbxt_crash_debug) {
3565
/* To save the logs, we just consider them in use: */
3566
if (current_log_id > XT_NUMBER_OF_LOGS_TO_SAVE)
3567
current_log_id -= XT_NUMBER_OF_LOGS_TO_SAVE;
3573
del_log_id = current_log_id - 1;
3575
while (del_log_id > 0) {
3576
db->db_xlog.xlog_name(PATH_MAX, path, del_log_id);
3577
if (!xt_fs_exists(path))
3582
/* This was the lowest log ID that existed: */
3585
/* Delete all logs that still exist, that come before
3588
* Do this from least to greatest to ensure no "holes" appear.
3590
while (del_log_id < current_log_id) {
3591
switch (db->db_xlog.xlog_delete_log(del_log_id, thread)) {
3604
/* And we can delete data logs in the list, and place them
3605
* on the deleted list.
3608
for (u_int i=0; i<no_of_logs; i++) {
3609
log_id = (xtLogID) XT_GET_DISK_2(cp_buf->xcp_del_log[i]);
3610
if (!xres_delete_data_log(db, log_id))
3618
checkpoint_complete:
3619
cp->cp_running = FALSE;
3620
if (cp->cp_table_ids) {
3621
xt_free_sortedlist(NULL, cp->cp_table_ids);
3622
cp->cp_table_ids = NULL;
3624
cp->cp_flush_count = 0;
3625
cp->cp_next_to_flush = 0;
3626
db->db_restart.xres_cp_required = FALSE;
3627
xt_unlock_mutex_ns(&cp->cp_state_lock);
3628
if (checkpoint_done)
3629
*checkpoint_done = TRUE;
3633
xt_close_file_ns(of);
3641
xt_unlock_mutex_ns(&cp->cp_state_lock);
3645
xtPublic xtWord8 xt_bytes_since_last_checkpoint(XTDatabaseHPtr db, xtLogID curr_log_id, xtLogOffset curr_log_offset)
3648
xtLogOffset log_offset;
3649
size_t byte_count = 0;
3651
log_id = db->db_restart.xres_cp_log_id;
3652
log_offset = db->db_restart.xres_cp_log_offset;
3654
/* Assume the logs have the threshold: */
3655
if (log_id < curr_log_id) {
3656
if (log_offset < xt_db_log_file_threshold)
3657
byte_count = (size_t) (xt_db_log_file_threshold - log_offset);
3661
while (log_id < curr_log_id) {
3662
byte_count += (size_t) xt_db_log_file_threshold;
3665
if (log_offset < curr_log_offset)
3666
byte_count += (size_t) (curr_log_offset - log_offset);
3671
xtPublic void xt_start_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3673
char name[PATH_MAX];
3675
sprintf(name, "CP-%s", xt_last_directory_of_path(db->db_main_path));
3676
xt_remove_dir_char(name);
3677
db->db_cp_thread = xt_create_daemon(self, name);
3678
xt_set_thread_data(db->db_cp_thread, db, xres_cp_free_thread);
3679
xt_run_thread(self, db->db_cp_thread, xres_cp_run_thread);
3682
xtPublic void xt_wait_for_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3685
xtBool message = FALSE;
3687
xtLogOffset log_offset;
3689
if (db->db_cp_thread) {
3692
xt_lock_mutex(self, &db->db_wr_lock);
3693
pushr_(xt_unlock_mutex, &db->db_wr_lock);
3694
log_id = db->db_wr_log_id;
3695
log_offset = db->db_wr_log_offset;
3696
freer_(); // xt_unlock_mutex(&db->db_wr_lock)
3698
if (xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
3699
xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
3700
xt_comp_log_pos(log_id, log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0)
3703
/* Do a final checkpoint before shutdown: */
3704
db->db_restart.xres_cp_required = TRUE;
3706
xt_lock_mutex(self, &db->db_cp_lock);
3707
pushr_(xt_unlock_mutex, &db->db_cp_lock);
3708
if (!xt_broadcast_cond_ns(&db->db_cp_cond)) {
3709
xt_log_and_clear_exception_ns();
3712
freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3714
xt_sleep_milli_second(10);
3717
if (now >= then + 16) {
3718
xt_logf(XT_NT_INFO, "Aborting wait for '%s' checkpointer\n", db->db_name);
3722
if (now >= then + 2) {
3725
xt_logf(XT_NT_INFO, "Waiting for '%s' checkpointer...\n", db->db_name);
3731
xt_logf(XT_NT_INFO, "Checkpointer '%s' done.\n", db->db_name);
3735
xtPublic void xt_stop_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3739
if (db->db_cp_thread) {
3740
xt_lock_mutex(self, &db->db_cp_lock);
3741
pushr_(xt_unlock_mutex, &db->db_cp_lock);
3743
/* This pointer is safe as long as you have the transaction lock. */
3744
if ((thr_wr = db->db_cp_thread)) {
3745
xtThreadID tid = thr_wr->t_id;
3747
/* Make sure the thread quits when woken up. */
3748
xt_terminate_thread(self, thr_wr);
3750
xt_wake_checkpointer(self, db);
3752
freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3755
* GOTCHA: This is a wierd thing but the SIGTERM directed
3756
* at a particular thread (in this case the sweeper) was
3757
* being caught by a different thread and killing the server
3758
* sometimes. Disconcerting.
3759
* (this may only be a problem on Mac OS X)
3760
xt_kill_thread(thread);
3762
xt_wait_for_thread_to_exit(tid, FALSE);
3764
/* PMC - This should not be necessary to set the signal here, but in the
3765
* debugger the handler is not called!!?
3766
thr_wr->t_delayed_signal = SIGTERM;
3767
xt_kill_thread(thread);
3769
db->db_cp_thread = NULL;
3772
freer_(); // xt_unlock_mutex(&db->db_cp_lock)
3776
xtPublic void xt_wake_checkpointer(XTThreadPtr self, XTDatabaseHPtr db)
3778
if (!xt_broadcast_cond_ns(&db->db_cp_cond))
3779
xt_log_and_clear_exception(self);
3782
xtPublic void xt_free_writer_state(struct XTThread *self, XTWriterStatePtr ws)
3785
ws->ws_db->db_xlog.xlog_seq_exit(&ws->ws_seqread);
3786
xt_db_set_size(self, &ws->ws_databuf, 0);
3787
xt_ib_free(self, &ws->ws_rec_buf);
3789
xt_db_return_table_to_pool(self, ws->ws_ot);
3794
xtPublic void xt_dump_xlogs(XTDatabaseHPtr db, xtLogID start_log)
3796
XTXactSeqReadRec seq;
3797
XTXactLogBufferDPtr record;
3798
xtLogID log_id = db->db_restart.xres_cp_log_id;
3799
char log_path[PATH_MAX];
3800
XTThreadPtr thread = xt_get_self();
3802
/* Find the first log that still exists:*/
3805
db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
3806
if (!xt_fs_exists(log_path))
3811
if (!db->db_xlog.xlog_seq_init(&seq, xt_db_log_buffer_size, FALSE))
3814
if (log_id < start_log)
3818
db->db_xlog.xlog_name(PATH_MAX, log_path, log_id);
3819
if (!xt_fs_exists(log_path))
3822
if (!db->db_xlog.xlog_seq_start(&seq, log_id, 0, FALSE))
3825
PRINTF("---------- DUMP LOG %d\n", (int) log_id);
3827
if (!db->db_xlog.xlog_seq_next(&seq, &record, TRUE, thread)) {
3828
PRINTF("---------- DUMP LOG %d ERROR\n", (int) log_id);
3829
xt_log_and_clear_exception_ns();
3833
PRINTF("---------- DUMP LOG %d DONE\n", (int) log_id);
3836
xt_print_log_record(seq.xseq_rec_log_id, seq.xseq_rec_log_offset, record);
3843
db->db_xlog.xlog_seq_exit(&seq);
3846
/* ----------------------------------------------------------------------
3847
* D A T A B A S E R E C O V E R Y T H R E A D
3851
static XTThreadPtr xres_recovery_thread;
3853
static void *xn_xres_run_recovery_thread(XTThreadPtr self)
3857
if (!(mysql_thread = (THD *) myxt_create_thread()))
3861
// static const std::string plugin_name("PBXT");
3863
// while (!xres_recovery_thread->t_quit && !Registry::singleton().find(plugin_name))
3864
// xt_sleep_milli_second(1);
3866
// while (!xres_recovery_thread->t_quit && !ha_resolve_by_legacy_type(mysql_thread, DB_TYPE_PBXT))
3867
// xt_sleep_milli_second(1);
3869
myxt_wait_pbxt_plugin_slot_assigned(self);
3871
if (!xres_recovery_thread->t_quit) {
3876
* It can happen that something will just get in before this
3877
* thread and open/recover the database!
3879
if (!pbxt_database) {
3880
xt_open_database(self, mysql_real_data_home, TRUE);
3882
* This can be done at the same time as the recovery thread,
3883
* strictly speaking I need a lock.
3885
if (!pbxt_database) {
3886
pbxt_database = self->st_database;
3887
xt_heap_reference(self, pbxt_database);
3891
xt_use_database(self, pbxt_database, XT_FOR_USER);
3893
pbxt_recovery_state = XT_RECOVER_DONE;
3895
/* {WAIT-FOR-SW-AFTER-RECOV}
3898
db = self->st_database;
3899
xt_lock_mutex(self, &db->db_init_sweep_lock);
3900
pushr_(xt_unlock_mutex, &db->db_init_sweep_lock);
3901
if (!db->db_init_sweep_done) {
3902
XTTableEntryPtr te_ptr;
3905
xt_init_sweeper_wait(self, db);
3907
/* Memory tables are not recovered. This loop will
3908
* make sure that the can be used, even though they are not recovered!
3910
for (u_int i=0; i<xt_sl_get_size(db->db_table_by_id); i++) {
3911
te_ptr = (XTTableEntryPtr) xt_sl_item_at(db->db_table_by_id, i);
3912
if ((tab = te_ptr->te_table)) {
3913
if (tab->tab_dic.dic_tab_flags & XT_TF_MEMORY_TABLE) {
3914
tab->tab_recovery_not_done = FALSE;
3915
tab->tab_dic.dic_disable_index = XT_INDEX_OK;
3920
db->db_init_sweep_done = TRUE;
3922
freer_(); // xt_unlock_mutex(&db->db_init_sweep_lock)
3924
pbxt_recovery_state = XT_RECOVER_SWEPT;
3927
pbxt_recovery_state = XT_RECOVER_ERROR;
3928
xt_log_and_clear_exception(self);
3934
* {MYSQL-THREAD-KILL}
3935
* Here is the problem with destroying the thread at this
3936
* point. If we had an error started, then it can lead
3937
* to a callback into pbxt: pbxt_panic().
3939
* This will shutdown things, making it impossible quite the
3940
* thread and do a cleanup. Solution:
3942
* Move the MySQL thread descruction to a later point!
3944
* sql/mysqld --no-defaults --basedir=~/maria/trunk
3945
* --character-sets-dir=~/maria/trunk/sql/share/charsets
3946
* --language=~/maria/trunk/sql/share/english
3947
* --skip-networking --datadir=/tmp/x --skip-grant-tables --nonexistentoption
3949
* #0 0x003893f9 in xt_exit_databases at database_xt.cc:304
3950
* #1 0x0039dc7e in pbxt_end at ha_pbxt.cc:947
3951
* #2 0x0039dd27 in pbxt_panic at ha_pbxt.cc:1289
3952
* #3 0x001d619e in ha_finalize_handlerton at handler.cc:391
3953
* #4 0x00279d22 in plugin_deinitialize at sql_plugin.cc:816
3954
* #5 0x0027bcf5 in reap_plugins at sql_plugin.cc:904
3955
* #6 0x0027c38c in plugin_thdvar_cleanup at sql_plugin.cc:2513
3956
* #7 0x000c0db2 in THD::~THD at sql_class.cc:934
3957
* #8 0x003b025b in myxt_destroy_thread at myxt_xt.cc:2999
3958
* #9 0x003b66b5 in xn_xres_run_recovery_thread at restart_xt.cc:3196
3959
* #10 0x003cbfbb in xt_thread_main at thread_xt.cc:1020
3961
myxt_destroy_thread(mysql_thread, TRUE);
3964
xres_recovery_thread = NULL;
3968
xtPublic void xt_xres_start_database_recovery(XTThreadPtr self)
3970
char name[PATH_MAX];
3972
sprintf(name, "DB-RECOVERY-%s", xt_last_directory_of_path(mysql_real_data_home));
3973
xt_remove_dir_char(name);
3975
pbxt_recovery_state = XT_RECOVER_PENDING;
3976
xres_recovery_thread = xt_create_daemon(self, name);
3977
xt_run_thread(self, xres_recovery_thread, xn_xres_run_recovery_thread);
3980
xtPublic void xt_xres_terminate_recovery(XTThreadPtr self)
3982
XTThreadPtr thr_rec;
3984
/* {MYSQL-THREAD-KILL}
3985
* Stack above shows that his is possible!
3987
if ((thr_rec = xres_recovery_thread) && (self != xres_recovery_thread)) {
3988
xtThreadID tid = thr_rec->t_id;
3990
xt_terminate_thread(self, thr_rec);
3992
xt_wait_for_thread_to_exit(tid, TRUE);
3996
/* ----------------------------------------------------------------------
3997
* L O G F L U S H P R O C E S S
4000
static void *xres_fl_run_thread(XTThreadPtr self)
4002
XTDatabaseHPtr db = (XTDatabaseHPtr) self->t_data;
4007
if (!(mysql_thread = myxt_create_thread()))
4010
while (!self->t_quit) {
4013
* The garbage collector requires that the database
4014
* is in use because.
4016
xt_use_database(self, db, XT_FOR_CHECKPOINTER);
4018
/* This action is both safe and required (see details elsewhere) */
4019
xt_heap_release(self, self->st_database);
4021
xt_set_low_priority(self);
4023
to_flush = xt_trace_clock() + XT_XLOG_FLUSH_FREQ * 1000;
4025
/* Wait 1 second: */
4026
while (!self->t_quit && xt_trace_clock() < to_flush)
4027
xt_sleep_milli_second(10);
4032
if (!db->db_xlog.xlog_flush(self))
4035
to_flush += XT_XLOG_FLUSH_FREQ * 1000;
4039
/* This error is "normal"! */
4040
if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
4041
!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
4042
self->t_exception.e_sys_err == SIGTERM))
4043
xt_log_and_clear_exception(self);
4047
/* Avoid releasing the database (done above) */
4048
self->st_database = NULL;
4049
xt_unuse_database(self, self);
4051
/* After an exception, pause before trying again... */
4052
/* Number of seconds */
4054
while (!self->t_quit && count > 0) {
4061
* {MYSQL-THREAD-KILL}
4062
myxt_destroy_thread(mysql_thread, TRUE);
4067
static void xres_fl_free_thread(XTThreadPtr self, void *data)
4069
XTDatabaseHPtr db = (XTDatabaseHPtr) data;
4071
if (db->db_fl_thread) {
4072
xt_lock_mutex(self, &db->db_fl_lock);
4073
pushr_(xt_unlock_mutex, &db->db_fl_lock);
4074
db->db_fl_thread = NULL;
4075
freer_(); // xt_unlock_mutex(&db->db_fl_lock)
4079
xtPublic void xt_start_flusher(XTThreadPtr self, XTDatabaseHPtr db)
4081
char name[PATH_MAX];
4083
sprintf(name, "FL-%s", xt_last_directory_of_path(db->db_main_path));
4084
xt_remove_dir_char(name);
4085
db->db_fl_thread = xt_create_daemon(self, name);
4086
xt_set_thread_data(db->db_fl_thread, db, xres_fl_free_thread);
4087
xt_run_thread(self, db->db_fl_thread, xres_fl_run_thread);
4090
xtPublic void xt_stop_flusher(XTThreadPtr self, XTDatabaseHPtr db)
4094
if (db->db_fl_thread) {
4095
xt_lock_mutex(self, &db->db_fl_lock);
4096
pushr_(xt_unlock_mutex, &db->db_fl_lock);
4098
/* This pointer is safe as long as you have the transaction lock. */
4099
if ((thr_fl = db->db_fl_thread)) {
4100
xtThreadID tid = thr_fl->t_id;
4102
/* Make sure the thread quits when woken up. */
4103
xt_terminate_thread(self, thr_fl);
4105
freer_(); // xt_unlock_mutex(&db->db_cp_lock)
4107
xt_wait_for_thread_to_exit(tid, FALSE);
4108
db->db_fl_thread = NULL;
4111
freer_(); // xt_unlock_mutex(&db->db_cp_lock)