~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/restart_xt.h

  • Committer: Padraig O'Sullivan
  • Date: 2009-07-22 23:26:26 UTC
  • mto: (1039.5.43 replication)
  • mto: This revision was merged to the branch mainline in revision 1130.
  • Revision ID: osullivan.padraig@gmail.com-20090722232626-mu4khq7ho6dqcf7q
Created a simple filtered replicator that can filter by schema name or table
name.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2007 PrimeBase Technologies GmbH
2
 
 *
3
 
 * PrimeBase XT
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * 2007-11-12   Paul McCullagh
20
 
 *
21
 
 * H&G2JCtL
22
 
 *
23
 
 * Restart and write data to the database.
24
 
 */
25
 
 
26
 
#ifndef __restart_xt_h__
27
 
#define __restart_xt_h__
28
 
 
29
 
#include "pthread_xt.h"
30
 
#include "filesys_xt.h"
31
 
#include "sortedlist_xt.h"
32
 
#include "util_xt.h"
33
 
#include "xactlog_xt.h"
34
 
 
35
 
struct XTThread;
36
 
struct XTOpenTable;
37
 
struct XTDatabase;
38
 
struct XTTable;
39
 
 
40
 
#ifdef XT_SORT_REC_WRITES
41
 
#ifdef TEST_SORT_REC_OVERFLOW
42
 
#define XT_TABLE_LIST_SIZE              5
43
 
#else
44
 
#define XT_TABLE_LIST_SIZE              223
45
 
#endif
46
 
#define XT_TABLE_LIST_INC               23
47
 
#endif
48
 
 
49
 
extern int                              pbxt_recovery_state;
50
 
 
51
 
typedef struct XTWriterState {
52
 
        struct XTDatabase               *ws_db;
53
 
        xtBool                                  ws_in_recover;
54
 
        xtLogID                                 ws_ind_rec_log_id;
55
 
        xtLogOffset                             ws_ind_rec_log_offset;
56
 
        XTXactSeqReadRec                ws_seqread;
57
 
        XTDataBufferRec                 ws_databuf;
58
 
        XTInfoBufferRec                 ws_rec_buf;
59
 
        xtTableID                               ws_tab_gone;                                    /* Cache the ID of the last table that does not exist. */
60
 
        xtTableID                               ws_tab_temp;                                    /* Cache the ID of the last temporary table. */
61
 
        struct XTOpenTable              *ws_ot;
62
 
#ifdef XT_SORT_REC_WRITES
63
 
        xtTableID                               ws_tab_flush_list[XT_TABLE_LIST_SIZE];
64
 
#endif
65
 
} XTWriterStateRec, *XTWriterStatePtr;
66
 
 
67
 
#define XT_CHECKPOINT_VERSION   1
68
 
 
69
 
typedef struct XTXlogCheckpoint {
70
 
        XTDiskValue2                    xcp_checksum_2;                                 /* The checksum of the all checkpoint data. */
71
 
        XTDiskValue4                    xcp_head_size_4;
72
 
        XTDiskValue2                    xcp_version_2;                                  /* The version of the checkpoint record. */
73
 
        XTDiskValue6                    xcp_chkpnt_no_6;                                /* Incremented for each checkpoint. */
74
 
        XTDiskValue4                    xcp_log_id_4;                                   /* The restart log ID. */
75
 
        XTDiskValue6                    xcp_log_offs_6;                                 /* The restart log offset. */
76
 
        XTDiskValue4                    xcp_tab_id_4;                                   /* The current high table ID. */
77
 
        XTDiskValue4                    xcp_xact_id_4;                                  /* The current high transaction ID. */
78
 
        XTDiskValue4                    xcp_ind_rec_log_id_4;                   /* The index recovery log ID. */
79
 
        XTDiskValue6                    xcp_ind_rec_log_offs_6;         /* The index recovery log offset. */
80
 
        XTDiskValue2                    xcp_log_count_2;                                /* Number of logs to be deleted in the area below. */
81
 
        XTDiskValue2                    xcp_del_log[XT_VAR_LENGTH];
82
 
} XTXlogCheckpointDRec, *XTXlogCheckpointDPtr;
83
 
 
84
 
typedef struct XTXactRestart {
85
 
        struct XTDatabase               *xres_db;
86
 
        int                                             xres_next_res_no;                               /* The next restart file to be written. */
87
 
        xtLogID                                 xres_cp_log_id;                                 /* Log number of the last checkpoint. */
88
 
        xtLogOffset                             xres_cp_log_offset;                             /* Log offset of the last checkpoint */
89
 
        xtBool                                  xres_cp_required;                               /* Checkpoint required (startup and shutdown). */
90
 
        xtWord8                                 xres_cp_number;                                 /* The checkpoint number (used to decide which is the latest checkpoint). */
91
 
 
92
 
public:
93
 
        void                                    xres_init(struct XTThread *self, struct XTDatabase *db, xtLogID *log_id, xtLogOffset *log_offset, xtLogID       *max_log_id);
94
 
        void                                    xres_exit(struct XTThread *self);
95
 
        xtBool                                  xres_is_checkpoint_pending(xtLogID log_id, xtLogOffset log_offset);
96
 
        void                                    xres_checkpoint_pending(xtLogID log_id, xtLogOffset log_offset);
97
 
        xtBool                                  xres_checkpoint(struct XTThread *self);
98
 
        void                                    xres_name(size_t size, char *path, xtLogID log_id);
99
 
 
100
 
private:
101
 
        xtBool                                  xres_check_checksum(XTXlogCheckpointDPtr buffer, size_t size);
102
 
        void                                    xres_recover_progress(XTThreadPtr self, XTOpenFilePtr *of, int perc);
103
 
        xtBool                                  xres_restart(struct XTThread *self, xtLogID *log_id, xtLogOffset *log_offset, xtLogID ind_rec_log_id, off_t ind_rec_log_offset, xtLogID *max_log_id);
104
 
        off_t                                   xres_bytes_to_read(struct XTThread *self, struct XTDatabase *db, u_int *log_count, xtLogID *max_log_id);
105
 
} XTXactRestartRec, *XTXactRestartPtr;
106
 
 
107
 
typedef struct XTCheckPointState {
108
 
        xtBool                                  cp_inited;                                              /* TRUE if structure was inited */
109
 
        xt_mutex_type                   cp_state_lock;                                  /* Lock and the entire checkpoint state. */
110
 
        xtBool                                  cp_running;                                             /* TRUE if a checkpoint is running. */
111
 
        xtLogID                                 cp_log_id;
112
 
        xtLogOffset                             cp_log_offset;
113
 
        xtLogID                                 cp_ind_rec_log_id;
114
 
        xtLogOffset                             cp_ind_rec_log_offset;
115
 
        XTSortedListPtr                 cp_table_ids;                                   /* List of tables to be flushed for the checkpoint. */
116
 
        u_int                                   cp_flush_count;                                 /* The number of tables flushed. */
117
 
        u_int                                   cp_next_to_flush;                               /* The next table to be flushed. */
118
 
} XTCheckPointStateRec, *XTCheckPointStatePtr;
119
 
 
120
 
#define XT_CPT_NONE_FLUSHED                     0
121
 
#define XT_CPT_REC_ROW_FLUSHED          1
122
 
#define XT_CPT_INDEX_FLUSHED            2
123
 
#define XT_CPT_REC_ROW_FLUSHING         4
124
 
#define XT_CPT_INDEX_FLUSHING           8
125
 
#define XT_CPT_ALL_FLUSHED                      (XT_CPT_REC_ROW_FLUSHED | XT_CPT_INDEX_FLUSHED)
126
 
 
127
 
#define XT_CPT_STATE_START_REC_ROW      1
128
 
#define XT_CPT_STATE_STOP_REC_ROW       2
129
 
#define XT_CPT_STATE_DONE_REC_ROW       3
130
 
#define XT_CPT_STATE_START_INDEX        4
131
 
#define XT_CPT_STATE_STOP_INDEX         5
132
 
#define XT_CPT_STATE_DONE_INDEX         6
133
 
#define XT_CPT_STATE_DONE_ALL           7
134
 
 
135
 
typedef struct XTCheckPointTable {
136
 
        u_int                                   cpt_flushed;
137
 
        xtTableID                               cpt_tab_id;
138
 
} XTCheckPointTableRec, *XTCheckPointTablePtr;
139
 
 
140
 
void xt_xres_init(struct XTThread *self, struct XTDatabase *db);
141
 
void xt_xres_exit(struct XTThread *self, struct XTDatabase *db);
142
 
 
143
 
void xt_xres_init_tab(struct XTThread *self, struct XTTable *tab);
144
 
void xt_xres_exit_tab(struct XTThread *self, struct XTTable *tab);
145
 
 
146
 
#ifdef XT_SORT_REC_WRITES
147
 
xtBool  xt_xres_delay_flush(struct XTOpenTable *ot, xtBool lock);
148
 
void    xt_xres_flush_all(struct XTThread *self, XTWriterStatePtr ws);
149
 
#endif
150
 
 
151
 
void    xt_xres_apply_in_order(struct XTThread *self, XTWriterStatePtr ws, xtLogID log_id, xtLogOffset log_offset, XTXactLogBufferDPtr record);
152
 
 
153
 
xtBool  xt_begin_checkpoint(struct XTDatabase *db, xtBool have_table_lock, struct XTThread *thread);
154
 
xtBool  xt_end_checkpoint(struct XTDatabase *db, struct XTThread *thread, xtBool *checkpoint_done);
155
 
void    xt_checkpoint_set_flush_state(struct XTDatabase *db, xtTableID tab_id, int state);
156
 
void    xt_checkpoint_flush_done(struct XTDatabase *db, xtTableID tab_id, int flush_bit);
157
 
void    xt_start_checkpointer(struct XTThread *self, struct XTDatabase *db);
158
 
void    xt_wait_for_checkpointer(struct XTThread *self, struct XTDatabase *db);
159
 
void    xt_stop_checkpointer(struct XTThread *self, struct XTDatabase *db);
160
 
void    xt_wake_checkpointer(struct XTThread *self, struct XTDatabase *db);
161
 
void    xt_free_writer_state(struct XTThread *self, XTWriterStatePtr ws);
162
 
xtWord8 xt_bytes_since_last_checkpoint(struct XTDatabase *db, xtLogID curr_log_id, xtLogOffset curr_log_offset);
163
 
 
164
 
void xt_print_log_record(xtLogID log, off_t offset, XTXactLogBufferDPtr record);
165
 
void xt_dump_xlogs(struct XTDatabase *db, xtLogID start_log);
166
 
 
167
 
void xt_xres_start_database_recovery(XTThreadPtr self);
168
 
void xt_xres_terminate_recovery(XTThreadPtr self);
169
 
 
170
 
void xt_start_flusher(struct XTThread *self, struct XTDatabase *db);
171
 
void xt_stop_flusher(struct XTThread *self, struct XTDatabase *db);
172
 
 
173
 
#define XT_RECOVER_PENDING                      0
174
 
#define XT_RECOVER_DONE                         1
175
 
#define XT_RECOVER_SWEPT                        2
176
 
#define XT_RECOVER_ERROR                        3
177
 
 
178
 
inline void xt_xres_wait_for_recovery(XTThreadPtr XT_UNUSED(self), int state)
179
 
{
180
 
        while (pbxt_recovery_state < state)
181
 
                xt_sleep_milli_second(100);
182
 
}
183
 
 
184
 
#endif