~drizzle-trunk/drizzle/development

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/* Copyright (c) 2007 PrimeBase Technologies GmbH
 *
 * PrimeBase XT
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 *
 * 2007-11-12	Paul McCullagh
 *
 * H&G2JCtL
 *
 * Restart and write data to the database.
 */

#ifndef __restart_xt_h__
#define __restart_xt_h__

#include "pthread_xt.h"
#include "filesys_xt.h"
#include "sortedlist_xt.h"
#include "util_xt.h"
#include "xactlog_xt.h"

struct XTThread;
struct XTOpenTable;
struct XTDatabase;
struct XTTable;

#ifdef XT_SORT_REC_WRITES
#ifdef TEST_SORT_REC_OVERFLOW
#define XT_TABLE_LIST_SIZE		5
#else
#define XT_TABLE_LIST_SIZE		223
#endif
#define XT_TABLE_LIST_INC		23
#endif

extern int				pbxt_recovery_state;

typedef struct XTWriterState {
	struct XTDatabase		*ws_db;
	xtBool					ws_in_recover;
	xtLogID					ws_ind_rec_log_id;
	xtLogOffset				ws_ind_rec_log_offset;
	XTXactSeqReadRec		ws_seqread;
	XTDataBufferRec			ws_databuf;
	XTInfoBufferRec			ws_rec_buf;
	xtTableID				ws_tab_gone;					/* Cache the ID of the last table that does not exist. */
	xtTableID				ws_tab_temp;					/* Cache the ID of the last temporary table. */
	struct XTOpenTable		*ws_ot;
#ifdef XT_SORT_REC_WRITES
	xtTableID				ws_tab_flush_list[XT_TABLE_LIST_SIZE];
#endif
} XTWriterStateRec, *XTWriterStatePtr;

#define XT_CHECKPOINT_VERSION	1

typedef struct XTXlogCheckpoint {
	XTDiskValue2			xcp_checksum_2;					/* The checksum of the all checkpoint data. */
	XTDiskValue4			xcp_head_size_4;
	XTDiskValue2			xcp_version_2;					/* The version of the checkpoint record. */
	XTDiskValue6			xcp_chkpnt_no_6;				/* Incremented for each checkpoint. */
	XTDiskValue4			xcp_log_id_4;					/* The restart log ID. */
	XTDiskValue6			xcp_log_offs_6;					/* The restart log offset. */
	XTDiskValue4			xcp_tab_id_4;					/* The current high table ID. */
	XTDiskValue4			xcp_xact_id_4;					/* The current high transaction ID. */
	XTDiskValue4			xcp_ind_rec_log_id_4;			/* The index recovery log ID. */
	XTDiskValue6			xcp_ind_rec_log_offs_6;		/* The index recovery log offset. */
	XTDiskValue2			xcp_log_count_2;				/* Number of logs to be deleted in the area below. */
	XTDiskValue2			xcp_del_log[XT_VAR_LENGTH];
} XTXlogCheckpointDRec, *XTXlogCheckpointDPtr;

typedef struct XTXactRestart {
	struct XTDatabase		*xres_db;
	int						xres_next_res_no;				/* The next restart file to be written. */
	xtLogID					xres_cp_log_id;					/* Log number of the last checkpoint. */
	xtLogOffset				xres_cp_log_offset;				/* Log offset of the last checkpoint */
	xtBool					xres_cp_required;				/* Checkpoint required (startup and shutdown). */
	xtWord8					xres_cp_number;					/* The checkpoint number (used to decide which is the latest checkpoint). */

public:
	void					xres_init(struct XTThread *self, struct XTDatabase *db, xtLogID *log_id, xtLogOffset *log_offset, xtLogID	*max_log_id);
	void					xres_exit(struct XTThread *self);
	xtBool					xres_is_checkpoint_pending(xtLogID log_id, xtLogOffset log_offset);
	void					xres_checkpoint_pending(xtLogID log_id, xtLogOffset log_offset);
	xtBool					xres_checkpoint(struct XTThread *self);
	void					xres_name(size_t size, char *path, xtLogID log_id);

private:
	xtBool					xres_check_checksum(XTXlogCheckpointDPtr buffer, size_t size);
	void					xres_recover_progress(XTThreadPtr self, XTOpenFilePtr *of, int perc);
	xtBool					xres_restart(struct XTThread *self, xtLogID *log_id, xtLogOffset *log_offset, xtLogID ind_rec_log_id, off_t ind_rec_log_offset, xtLogID *max_log_id);
	off_t					xres_bytes_to_read(struct XTThread *self, struct XTDatabase *db, u_int *log_count, xtLogID *max_log_id);
} XTXactRestartRec, *XTXactRestartPtr;

typedef struct XTCheckPointState {
	xtBool					cp_inited;						/* TRUE if structure was inited */
	xt_mutex_type			cp_state_lock;					/* Lock and the entire checkpoint state. */
	xtBool					cp_running;						/* TRUE if a checkpoint is running. */
	xtLogID					cp_log_id;
	xtLogOffset				cp_log_offset;
	xtLogID					cp_ind_rec_log_id;
	xtLogOffset				cp_ind_rec_log_offset;
	XTSortedListPtr			cp_table_ids;					/* List of tables to be flushed for the checkpoint. */
	u_int					cp_flush_count;					/* The number of tables flushed. */
	u_int					cp_next_to_flush;				/* The next table to be flushed. */
} XTCheckPointStateRec, *XTCheckPointStatePtr;

#define XT_CPT_NONE_FLUSHED			0
#define XT_CPT_REC_ROW_FLUSHED		1
#define XT_CPT_INDEX_FLUSHED		2
#define XT_CPT_REC_ROW_FLUSHING		4
#define XT_CPT_INDEX_FLUSHING		8
#define XT_CPT_ALL_FLUSHED			(XT_CPT_REC_ROW_FLUSHED | XT_CPT_INDEX_FLUSHED)

#define XT_CPT_STATE_START_REC_ROW	1
#define XT_CPT_STATE_STOP_REC_ROW	2
#define XT_CPT_STATE_DONE_REC_ROW	3
#define XT_CPT_STATE_START_INDEX	4
#define XT_CPT_STATE_STOP_INDEX		5
#define XT_CPT_STATE_DONE_INDEX		6
#define XT_CPT_STATE_DONE_ALL		7

typedef struct XTCheckPointTable {
	u_int					cpt_flushed;
	xtTableID				cpt_tab_id;
} XTCheckPointTableRec, *XTCheckPointTablePtr;

void xt_xres_init(struct XTThread *self, struct XTDatabase *db);
void xt_xres_exit(struct XTThread *self, struct XTDatabase *db);

void xt_xres_init_tab(struct XTThread *self, struct XTTable *tab);
void xt_xres_exit_tab(struct XTThread *self, struct XTTable *tab);

#ifdef XT_SORT_REC_WRITES
xtBool	xt_xres_delay_flush(struct XTOpenTable *ot, xtBool lock);
void	xt_xres_flush_all(struct XTThread *self, XTWriterStatePtr ws);
#endif

void	xt_xres_apply_in_order(struct XTThread *self, XTWriterStatePtr ws, xtLogID log_id, xtLogOffset log_offset, XTXactLogBufferDPtr record);

xtBool	xt_begin_checkpoint(struct XTDatabase *db, xtBool have_table_lock, struct XTThread *thread);
xtBool	xt_end_checkpoint(struct XTDatabase *db, struct XTThread *thread, xtBool *checkpoint_done);
void	xt_checkpoint_set_flush_state(struct XTDatabase *db, xtTableID tab_id, int state);
void	xt_checkpoint_flush_done(struct XTDatabase *db, xtTableID tab_id, int flush_bit);
void	xt_start_checkpointer(struct XTThread *self, struct XTDatabase *db);
void	xt_wait_for_checkpointer(struct XTThread *self, struct XTDatabase *db);
void	xt_stop_checkpointer(struct XTThread *self, struct XTDatabase *db);
void	xt_wake_checkpointer(struct XTThread *self, struct XTDatabase *db);
void	xt_free_writer_state(struct XTThread *self, XTWriterStatePtr ws);
xtWord8	xt_bytes_since_last_checkpoint(struct XTDatabase *db, xtLogID curr_log_id, xtLogOffset curr_log_offset);

void xt_print_log_record(xtLogID log, off_t offset, XTXactLogBufferDPtr record);
void xt_dump_xlogs(struct XTDatabase *db, xtLogID start_log);

void xt_xres_start_database_recovery(XTThreadPtr self);
void xt_xres_terminate_recovery(XTThreadPtr self);

void xt_start_flusher(struct XTThread *self, struct XTDatabase *db);
void xt_stop_flusher(struct XTThread *self, struct XTDatabase *db);

#define XT_RECOVER_PENDING			0
#define XT_RECOVER_DONE				1
#define XT_RECOVER_SWEPT			2
#define XT_RECOVER_ERROR			3

inline void xt_xres_wait_for_recovery(XTThreadPtr XT_UNUSED(self), int state)
{
	while (pbxt_recovery_state < state)
		xt_sleep_milli_second(100);
}

#endif