~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/trans_log_ms.cc

  • Committer: Mark Atwood
  • Date: 2011-12-20 02:32:53 UTC
  • mfrom: (2469.1.1 drizzle-build)
  • Revision ID: me@mark.atwood.name-20111220023253-bvu0kr14kwsdvz7g
mergeĀ lp:~brianaker/drizzle/deprecate-pbms

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Barry Leslie
20
 
 *
21
 
 * 2009-06-09
22
 
 *
23
 
 * H&G2JCtL
24
 
 *
25
 
 * PBMS transaction handling.
26
 
 *
27
 
 * PBMS uses 1 circular transaction log. All BLOB reference operations are written to this log
28
 
 * and are applied to the repository when committed. There is 1 thread dedicated to reading the 
29
 
 * transaction log and applying the changes. During an engine level backup this thread is suspended 
30
 
 * so that no transactions will be applied to the repository files as they are backed up.
31
 
 *
32
 
 */
33
 
#include "cslib/CSConfig.h"
34
 
 
35
 
#include <stdlib.h>
36
 
#include <inttypes.h>
37
 
 
38
 
#include "cslib/CSGlobal.h"
39
 
#include "cslib/CSStrUtil.h"
40
 
#include "cslib/CSStorage.h"
41
 
 
42
 
#include "trans_log_ms.h"
43
 
#include "trans_cache_ms.h"
44
 
 
45
 
#ifdef CRASH_TEST
46
 
uint32_t        trans_test_crash_point;
47
 
#define CRASH_POINT(p) { if (p == trans_test_crash_point) { char *ptr = NULL; printf("Crash on demand at: %s(%d), start: %"PRIu64", eol: %"PRIu64"\n", __FILE__, __LINE__, txn_Start, txn_EOL); *ptr = 88;}}
48
 
#else
49
 
#define CRASH_POINT(p)
50
 
#endif
51
 
 
52
 
#define MS_TRANS_LOG_MAGIC                      0xA6E7D7B3
53
 
#define MS_TRANS_LOG_VERSION            1
54
 
#define MS_TRANS_LOG_RECOVERED          0XA1
55
 
#define MS_TRANS_LOG_NOT_RECOVERED      0XA2
56
 
#define MS_TRANS_NO_OVERFLOW            0XB1
57
 
#define MS_TRANS_OVERFLOW                       0XB2
58
 
 
59
 
#define DFLT_TRANS_CHECKPOINT_THRESHOLD 1024
60
 
 
61
 
#define DFLT_TRANS_LOG_LIST_SIZE        (1024 * 10)
62
 
#define DFLT_TRANS_CACHE_SIZE           (500)
63
 
 
64
 
#define TRANS_CAN_RESIZE        ((txn_MaxRecords != txn_ReqestedMaxRecords) && (txn_EOL >= txn_Start) && !txn_HaveOverflow)
65
 
 
66
 
typedef struct MSDiskTrans {
67
 
        CSDiskValue4    dtr_id_4;                       // The transaction ID
68
 
        CSDiskValue1    dtr_type_1;                     // The transaction type. If the first bit is set then the transaction is an autocommit.
69
 
        CSDiskValue1    dtr_check_1;            // The trransaction record checksum.
70
 
        CSDiskValue4    dtr_db_id_4;            // The database ID for the operation.
71
 
        CSDiskValue4    dtr_tab_id_4;           // The table ID for the operation.
72
 
        CSDiskValue8    dtr_blob_id_8;          // The blob ID for the operation.
73
 
        CSDiskValue8    dtr_blob_ref_id_8;      // The blob reference id.
74
 
} MSDiskTransRec, *MSDiskTransPtr;
75
 
 
76
 
#define SET_DISK_TRANSREC(d, s) { \
77
 
        CS_SET_DISK_4((d)->dtr_id_4, (s)->tr_id);\
78
 
        CS_SET_DISK_1((d)->dtr_type_1, (s)->tr_type);\
79
 
        CS_SET_DISK_1((d)->dtr_check_1, (s)->tr_check);\
80
 
        CS_SET_DISK_4((d)->dtr_db_id_4, (s)->tr_db_id);\
81
 
        CS_SET_DISK_4((d)->dtr_tab_id_4, (s)->tr_tab_id);\
82
 
        CS_SET_DISK_8((d)->dtr_blob_id_8, (s)->tr_blob_id);\
83
 
        CS_SET_DISK_8((d)->dtr_blob_ref_id_8, (s)->tr_blob_ref_id);\
84
 
}
85
 
 
86
 
#define GET_DISK_TRANSREC(s, d) { \
87
 
        (s)->tr_id = CS_GET_DISK_4((d)->dtr_id_4);\
88
 
        (s)->tr_type = CS_GET_DISK_1((d)->dtr_type_1);\
89
 
        (s)->tr_check = CS_GET_DISK_1((d)->dtr_check_1);\
90
 
        (s)->tr_db_id = CS_GET_DISK_4((d)->dtr_db_id_4);\
91
 
        (s)->tr_tab_id = CS_GET_DISK_4((d)->dtr_tab_id_4);\
92
 
        (s)->tr_blob_id = CS_GET_DISK_8((d)->dtr_blob_id_8);\
93
 
        (s)->tr_blob_ref_id = CS_GET_DISK_8((d)->dtr_blob_ref_id_8);\
94
 
}
95
 
 
96
 
static uint8_t checksum(uint8_t *data, size_t len)
97
 
{
98
 
        register uint32_t       sum = 0, g;
99
 
        uint8_t                         *chk;
100
 
 
101
 
        chk = data + len - 1;
102
 
        while (chk > data) {
103
 
                sum = (sum << 4) + *chk;
104
 
                if ((g = sum & 0xF0000000)) {
105
 
                        sum = sum ^ (g >> 24);
106
 
                        sum = sum ^ g;
107
 
                }
108
 
                chk--;
109
 
        }
110
 
        return (uint8_t) (sum ^ (sum >> 24) ^ (sum >> 16) ^ (sum >> 8));
111
 
}
112
 
 
113
 
MSTrans::MSTrans() : 
114
 
        CSSharedRefObject(),
115
 
        txn_MaxCheckPoint(0),
116
 
        txn_Doingbackup(false),
117
 
        txn_reader(NULL),
118
 
        txn_IsTxnValid(false),
119
 
        txn_CurrentTxn(0),
120
 
        txn_TxnIndex(0),
121
 
        txn_StartCheckPoint(0),
122
 
        txn_TransCache(NULL),
123
 
        txn_BlockingTransaction(0),
124
 
        txn_File(NULL),
125
 
        txn_EOLCheckPoint(0),
126
 
        txn_MaxRecords(0),
127
 
        txn_ReqestedMaxRecords(0),
128
 
        txn_HighWaterMark(0),
129
 
        txn_OverflowCount(0),
130
 
        txn_MaxTID(0),
131
 
        txn_Recovered(false),
132
 
        txn_HaveOverflow(false),
133
 
        txn_Overflow(0),
134
 
        txn_EOL(0),
135
 
        txn_Start(0),
136
 
        txn_Checksum(0)
137
 
{
138
 
}
139
 
 
140
 
MSTrans::~MSTrans()
141
 
{
142
 
        txn_Close();
143
 
        if (txn_TransCache) 
144
 
                txn_TransCache->release();
145
 
                
146
 
}       
147
 
 
148
 
void MSTrans::txn_Close()
149
 
{
150
 
        
151
 
        if (txn_File) {         
152
 
                // Set the header to indicate that the log has not been closed properly. 
153
 
                CS_SET_DISK_4(txn_DiskHeader.th_next_txn_id_4, txn_MaxTID);
154
 
                txn_File->write(&(txn_DiskHeader.th_next_txn_id_4), offsetof(MSDiskTransHeadRec, th_next_txn_id_4), 4 );
155
 
 
156
 
                CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
157
 
                CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
158
 
                CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
159
 
                txn_File->write(&(txn_DiskHeader.th_start_8), 
160
 
                                                        offsetof(MSDiskTransHeadRec, th_start_8), 
161
 
                                                        sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
162
 
CRASH_POINT(1);
163
 
                txn_File->flush();
164
 
                txn_File->sync();
165
 
 
166
 
                if (txn_Recovered) {
167
 
                        // Write the recovered flag seperately just incase of a crash during the write operation.
168
 
                        CS_SET_DISK_1(txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);           
169
 
                        txn_File->write(&(txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1 );
170
 
                        txn_File->flush();
171
 
                        txn_File->sync();
172
 
                }
173
 
                
174
 
                txn_File->close();
175
 
                txn_File->release();
176
 
                txn_File = NULL;                
177
 
        }
178
 
}
179
 
void MSTrans::txn_SetFile(CSFile *tr_file)
180
 
{
181
 
        txn_File = tr_file;
182
 
}
183
 
 
184
 
//#define TRACE_ALL
185
 
#ifdef TRACE_ALL
186
 
static FILE *txn_debug_log;
187
 
#endif
188
 
 
189
 
MSTrans *MSTrans::txn_NewMSTrans(const char *log_path, bool dump_log)
190
 
{
191
 
        MSTrans *trans = NULL;
192
 
        CSPath *path = NULL;
193
 
        uint64_t log_size;
194
 
        enter_();
195
 
 
196
 
        (void) dump_log;
197
 
        
198
 
        new_(trans, MSTrans());
199
 
        push_(trans);
200
 
 
201
 
        path = CSPath::newPath(log_path);
202
 
        push_(path);    
203
 
                
204
 
try_again:
205
 
        
206
 
        
207
 
        if (!path->exists()) { // Create the transaction log.   
208
 
                // Preallocate the log space and initialize it.
209
 
                MSDiskTransRec *recs;
210
 
                off64_t offset = sizeof(MSDiskTransHeadRec);
211
 
                uint64_t num_records = DFLT_TRANS_LOG_LIST_SIZE;
212
 
                size_t size;
213
 
                CSFile *tr_file;
214
 
                
215
 
                recs = (MSDiskTransRec *) cs_calloc(1024 * sizeof(MSDiskTransRec));
216
 
                push_ptr_(recs);
217
 
                
218
 
                tr_file = path->createFile(CSFile::CREATE);
219
 
                push_(tr_file);
220
 
                
221
 
                log_size = DFLT_TRANS_LOG_LIST_SIZE * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
222
 
 
223
 
                
224
 
                while (num_records) {
225
 
                        if (num_records < 1024)
226
 
                                size = num_records;
227
 
                        else
228
 
                                size = 1024;
229
 
                        tr_file->write(recs, offset, size * sizeof(MSDiskTransRec));
230
 
                        offset += size * sizeof(MSDiskTransRec);
231
 
                        num_records -= size;
232
 
                }
233
 
 
234
 
                trans->txn_MaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
235
 
                trans->txn_ReqestedMaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
236
 
                trans->txn_MaxCheckPoint = DFLT_TRANS_CHECKPOINT_THRESHOLD;
237
 
                trans->txn_MaxTID = 1;
238
 
 
239
 
                // Initialize the log header.
240
 
                CS_SET_DISK_4(trans->txn_DiskHeader.th_magic_4, MS_TRANS_LOG_MAGIC);
241
 
                CS_SET_DISK_2(trans->txn_DiskHeader.th_version_2, MS_TRANS_LOG_VERSION);
242
 
                
243
 
                CS_SET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4, trans->txn_MaxTID);
244
 
 
245
 
                CS_SET_DISK_2(trans->txn_DiskHeader.th_check_point_2, trans->txn_MaxCheckPoint);
246
 
 
247
 
                CS_SET_DISK_8(trans->txn_DiskHeader.th_list_size_8, trans->txn_MaxRecords);
248
 
                CS_SET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8, trans->txn_ReqestedMaxRecords);
249
 
 
250
 
                CS_SET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4, DFLT_TRANS_CACHE_SIZE);
251
 
                
252
 
                CS_SET_DISK_8(trans->txn_DiskHeader.th_start_8, 0);
253
 
                CS_SET_DISK_8(trans->txn_DiskHeader.th_eol_8, 0);
254
 
                
255
 
                CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);
256
 
                CS_SET_DISK_1(trans->txn_DiskHeader.th_checksum_1, 1);
257
 
                CS_SET_DISK_1(trans->txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
258
 
                
259
 
                tr_file->write(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec));
260
 
                pop_(tr_file);
261
 
                trans->txn_SetFile(tr_file);
262
 
                
263
 
                trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
264
 
                
265
 
                trans->txn_TransCache = MSTransCache::newMSTransCache(DFLT_TRANS_CACHE_SIZE);
266
 
                
267
 
                release_(recs);
268
 
 
269
 
        } else { // The transaction log already exists
270
 
                bool overflow = false, recovered = false;
271
 
                
272
 
                CSFile *tr_file = path->createFile(CSFile::DEFAULT); // Open read/write
273
 
                push_(tr_file);
274
 
                
275
 
                // Read the log header:
276
 
                if (tr_file->read(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec), 0) < sizeof(MSDiskTransHeadRec)) {
277
 
                        release_(tr_file);
278
 
                        path->removeFile();
279
 
                        goto try_again;
280
 
                }
281
 
                
282
 
                // check the log header:                
283
 
                if (CS_GET_DISK_4(trans->txn_DiskHeader.th_magic_4) != MS_TRANS_LOG_MAGIC)
284
 
                        CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_HEADER_MAGIC);
285
 
                
286
 
                if (CS_GET_DISK_2(trans->txn_DiskHeader.th_version_2) != MS_TRANS_LOG_VERSION)
287
 
                        CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_VERSION_TOO_NEW);
288
 
                        
289
 
                //----
290
 
                if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_NO_OVERFLOW) 
291
 
                        overflow = false;
292
 
                else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_OVERFLOW) 
293
 
                        overflow = true;
294
 
                else 
295
 
                        CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
296
 
                
297
 
                //----
298
 
                if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_NOT_RECOVERED) 
299
 
                        recovered = false;
300
 
                else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_RECOVERED) 
301
 
                        recovered = true;
302
 
                else 
303
 
                        CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
304
 
 
305
 
                // Check that the log is the expected size.
306
 
                log_size = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8) * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
307
 
                
308
 
                if ((log_size > tr_file->getEOF()) || 
309
 
                        ((log_size < tr_file->getEOF()) && !overflow)){ 
310
 
                                        
311
 
                        char buffer[CS_EXC_MESSAGE_SIZE];               
312
 
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unexpected transaction log size: ");
313
 
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, path->getCString());
314
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_BAD_FILE_HEADER, buffer);
315
 
                }
316
 
                
317
 
                trans->txn_MaxTID = CS_GET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4);
318
 
 
319
 
                // Looks good, we will assume it is a valid log file.
320
 
                trans->txn_TransCache = MSTransCache::newMSTransCache(CS_GET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4));
321
 
 
322
 
                pop_(tr_file);
323
 
                trans->txn_SetFile(tr_file);
324
 
 
325
 
                trans->txn_MaxCheckPoint = CS_GET_DISK_2(trans->txn_DiskHeader.th_check_point_2);
326
 
                
327
 
                trans->txn_MaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8);
328
 
                trans->txn_ReqestedMaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8);
329
 
                
330
 
                trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
331
 
                trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
332
 
                trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
333
 
                trans->txn_HaveOverflow = overflow;
334
 
                if (overflow) 
335
 
                        trans->txn_Overflow = (tr_file->getEOF() - sizeof(MSDiskTransHeadRec)) /sizeof(MSDiskTransRec); 
336
 
                else
337
 
                        trans->txn_Overflow = 0;
338
 
 
339
 
#ifdef DEBUG
340
 
                if (overflow)
341
 
                        printf("Recovering overflow log\n");
342
 
                if (dump_log) {
343
 
                        char name[100];
344
 
                        snprintf(name, 100, "%dms-trans-log.dump", (int)time(NULL));
345
 
                        trans->txn_DumpLog(name);
346
 
                }
347
 
#endif
348
 
                // Recover the log if required.
349
 
                if (!recovered)
350
 
                        trans->txn_Recover();
351
 
                        
352
 
                        
353
 
        }
354
 
        
355
 
        trans->txn_Recovered = true; // Any recovery required has been completed.
356
 
 
357
 
        // The log has been recovered so these values should be valid:
358
 
        trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
359
 
        trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
360
 
        
361
 
        // Set the header to indicate that the log has not been closed properly. 
362
 
        // This is reset when the log is closed during shutdown.
363
 
        CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_NOT_RECOVERED);
364
 
        trans->txn_File->write(&(trans->txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1);
365
 
        
366
 
        // Load the transaction records into memory.
367
 
        trans->txn_TransCache->tc_StartCacheReload(true);
368
 
        trans->txn_LoadTransactionCache(trans->txn_Start);
369
 
        trans->txn_TransCache->tc_CompleteCacheReload();
370
 
        
371
 
        if (trans->txn_MaxRecords != trans->txn_ReqestedMaxRecords) 
372
 
                trans->txn_ResizeLog(); // Try to resize but it may not be possible yet.
373
 
        
374
 
        release_(path);
375
 
        pop_(trans);
376
 
 
377
 
#ifdef TRACE_ALL
378
 
        
379
 
        txn_debug_log = fopen("log_dump.txt", "w+");
380
 
        if (!txn_debug_log) {
381
 
                perror("log_dump.txt");
382
 
        }
383
 
#endif
384
 
        
385
 
        return_(trans);
386
 
}
387
 
 
388
 
bool MSTrans::txn_ValidRecord(MSTransPtr rec)
389
 
{
390
 
        uint8_t check = rec->tr_check;
391
 
        bool ok;
392
 
        
393
 
        rec->tr_check = txn_Checksum;
394
 
        ok = (checksum((uint8_t*)rec, sizeof(MSTransRec)) == check);
395
 
        rec->tr_check = check;
396
 
        return ok;
397
 
}
398
 
 
399
 
void MSTrans::txn_GetRecordAt(uint64_t index, MSTransPtr rec)
400
 
{
401
 
        MSDiskTransRec drec;
402
 
        off64_t offset;
403
 
        
404
 
        // Read 1 record from the log and convert it from disk format.
405
 
        offset = sizeof(MSDiskTransHeadRec) + index * sizeof(MSDiskTransRec);           
406
 
        txn_File->read(&drec, offset, sizeof(MSDiskTransRec), sizeof(MSDiskTransRec));
407
 
        GET_DISK_TRANSREC(rec, &drec);
408
 
}
409
 
 
410
 
// Recovery involves finding the start of the first record and the eof
411
 
// position. The positions will be found at or after the position stored
412
 
// in the header. 
413
 
void MSTrans::txn_Recover()
414
 
{
415
 
        MSTransRec rec = {0,0,0,0,0,0,0};
416
 
        uint64_t original_eol = txn_EOL;
417
 
        enter_();
418
 
 
419
 
#ifdef DEBUG
420
 
printf("Recovering      transaction log!\n");
421
 
#endif
422
 
 
423
 
        txn_MaxTID = 0;
424
 
        // Search for the last valid record in the log starting from the last 
425
 
        // known position stored in the header.
426
 
        for (; txn_EOL < txn_MaxRecords; txn_EOL++) {
427
 
                txn_GetRecordAt(txn_EOL, &rec);
428
 
                if (! txn_ValidRecord(&rec))
429
 
                        break;  
430
 
        }
431
 
        
432
 
        if (txn_EOL == txn_MaxRecords) {
433
 
                // It looks like all the records in the log are valid?
434
 
                // This is strange but could happen if the crash
435
 
                // occurred just before updating the header as the
436
 
                // eol position rolled over to the top of the log.
437
 
                txn_EOL = 0;
438
 
        }
439
 
        
440
 
        txn_MaxTID++;
441
 
        
442
 
        CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
443
 
        
444
 
        // If the actual eol has moved pass the recorded start position
445
 
        // then the actuall start position must be some where beyond
446
 
        // the eol.
447
 
        if (((original_eol < txn_Start) || (original_eol > txn_EOL)) && (txn_EOL >= txn_Start)) 
448
 
                txn_Start = txn_EOL +1; 
449
 
 
450
 
        // Position the start at the beginning of a transaction.        
451
 
        uint64_t end_search = (txn_Start < txn_EOL)? txn_EOL : txn_MaxRecords;
452
 
        for (; txn_Start < end_search; txn_Start++) {
453
 
                txn_GetRecordAt(txn_Start, &rec);
454
 
                if (TRANS_IS_START(rec.tr_type))
455
 
                        break;  
456
 
        }
457
 
 
458
 
        if (txn_Start == end_search)
459
 
                txn_Start = txn_EOL; 
460
 
                
461
 
        CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
462
 
        
463
 
        txn_TransCache->tc_SetRecovering(true); 
464
 
        // Load the transaction records into the cache.
465
 
        txn_TransCache->tc_StartCacheReload(true);
466
 
        txn_LoadTransactionCache(txn_Start);
467
 
        txn_TransCache->tc_CompleteCacheReload();
468
 
        
469
 
        // Now go through all the transactions and add rollbacks for any
470
 
        // unterminated transactions.
471
 
        TRef ref;
472
 
        bool terminated;
473
 
        while  (txn_TransCache->tc_GetTransaction(&ref, &terminated)) {
474
 
                
475
 
                txn_MaxTID = txn_TransCache->tc_GetTransactionID(ref); // Save the TID of the last transaction.
476
 
                if (!terminated) {
477
 
                        self->myTID = txn_MaxTID;
478
 
                        self->myTransRef = ref;
479
 
                        self->myStartTxn = false;
480
 
                        txn_AddTransaction(MS_RecoveredTxn);
481
 
                }
482
 
CRASH_POINT(2);
483
 
                txn_TransCache->tc_FreeTransaction(ref);
484
 
                
485
 
                // Load the next block of transactions into the cache.
486
 
                // This needs to be done after each tc_GetTransaction() to make sure
487
 
                // that if the transaction terminator is some where in the log
488
 
                // it will get read even if the cache is completely full. 
489
 
                if (txn_TransCache->tc_ShoulReloadCache()) {
490
 
                        txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload(true));
491
 
                        txn_TransCache->tc_CompleteCacheReload();
492
 
                }
493
 
        }
494
 
        
495
 
        
496
 
        txn_TransCache->tc_SetRecovering(false); 
497
 
        self->myTransRef = 0;
498
 
        
499
 
        // Update the header again incase rollbacks have been added.
500
 
        CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
501
 
                                
502
 
        exit_();
503
 
}
504
 
 
505
 
bool ReadTXNLog::rl_CanContinue() 
506
 
507
 
        return rl_log->txn_TransCache->tc_ContinueCacheReload();
508
 
}
509
 
 
510
 
void ReadTXNLog::rl_Load(uint64_t log_position, MSTransPtr rec) 
511
 
{
512
 
        rl_log->txn_TransCache->tc_AddRec(log_position, rec);
513
 
}
514
 
 
515
 
void ReadTXNLog::rl_Store(uint64_t log_position, MSTransPtr rec) 
516
 
{
517
 
        MSDiskTransRec drec;
518
 
        SET_DISK_TRANSREC(&drec, rec);
519
 
 
520
 
        rl_log->txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + log_position * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec));
521
 
}
522
 
 
523
 
void ReadTXNLog::rl_Flush() 
524
 
{
525
 
        rl_log->txn_File->flush();
526
 
        rl_log->txn_File->sync();
527
 
}
528
 
 
529
 
void ReadTXNLog::rl_ReadLog(uint64_t read_start, bool log_locked)
530
 
{
531
 
        uint64_t        size, orig_size;
532
 
        bool reading_overflow = (read_start >= rl_log->txn_MaxRecords);
533
 
        enter_();
534
 
        
535
 
 
536
 
        // Get the number of transaction records to be loaded.
537
 
        if (reading_overflow) {
538
 
                orig_size = rl_log->txn_Overflow;
539
 
                size = rl_log->txn_Overflow - read_start;
540
 
        } else {
541
 
                orig_size = rl_log->txn_GetNumRecords();
542
 
        
543
 
                if (rl_log->txn_Start <= read_start)
544
 
                        size = orig_size - (read_start - rl_log->txn_Start);
545
 
                else 
546
 
                        size = rl_log->txn_EOL - read_start;
547
 
        }
548
 
        
549
 
        // load all the records
550
 
        while (size && rl_CanContinue()) {
551
 
                MSDiskTransRec diskRecords[1000];
552
 
                uint32_t read_size;
553
 
                off64_t offset;
554
 
                
555
 
                if (size > 1000) 
556
 
                        read_size = 1000 ;
557
 
                else
558
 
                        read_size = size ;
559
 
                
560
 
                // Check if we have reached the wrap around point in the log.
561
 
                if ((!reading_overflow) && (rl_log->txn_EOL < read_start) && ((rl_log->txn_MaxRecords - read_start) < read_size))
562
 
                        read_size = rl_log->txn_MaxRecords - read_start ;
563
 
 
564
 
                // Read the next block of records.
565
 
                offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);              
566
 
                rl_log->txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
567
 
                
568
 
                // Convert the records from disk format and add them to the cache.
569
 
                for (uint32_t i = 0; i < read_size && rl_CanContinue(); i++) {
570
 
                        MSTransRec rec;
571
 
                        MSDiskTransPtr drec = diskRecords + i;
572
 
                        GET_DISK_TRANSREC(&rec, drec);
573
 
                        
574
 
                        rl_Load(read_start + i, &rec); 
575
 
                }
576
 
                
577
 
                size -= read_size;
578
 
                read_start += read_size;
579
 
                if (read_start == rl_log->txn_MaxRecords)
580
 
                        read_start = 0;
581
 
        }
582
 
        
583
 
        if (rl_log->txn_HaveOverflow && !reading_overflow) {
584
 
                if (rl_CanContinue()) 
585
 
                        rl_ReadLog(rl_log->txn_MaxRecords, false);
586
 
                
587
 
        } else if (!log_locked) {       
588
 
                // The following is intended to prevent the case where a writer 
589
 
                // writes an txn record while the cache is full but just after 
590
 
                // the reload has completed. If the cache is not yet full we need
591
 
                // to load as many of the new records into cache as possible.
592
 
        
593
 
                uint64_t        new_size;
594
 
                lock_(rl_log);
595
 
                if (reading_overflow)
596
 
                        new_size = rl_log->txn_Overflow;
597
 
                else 
598
 
                        new_size = rl_log->txn_GetNumRecords();
599
 
                if (rl_CanContinue() && (orig_size != new_size)) {
600
 
                        rl_ReadLog(read_start, true);
601
 
                }
602
 
                unlock_(rl_log);
603
 
        }
604
 
        
605
 
 
606
 
        exit_();
607
 
}
608
 
 
609
 
void MSTrans::txn_LoadTransactionCache(uint64_t read_start)
610
 
{
611
 
        ReadTXNLog log(this);
612
 
        enter_();
613
 
        log.rl_ReadLog(read_start, false);
614
 
        txn_TransCache->tc_UpdateCacheVersion(); // Signal writes to recheck cache for overflow txn refs.
615
 
        exit_();
616
 
}
617
 
 
618
 
void  MSTrans::txn_ResizeLog()
619
 
{
620
 
        enter_();
621
 
        
622
 
        lock_(this);
623
 
        if (TRANS_CAN_RESIZE) {
624
 
                // TRANS_CAN_RESIZE checks that there is no overflow and the the start position 
625
 
                // is less than eol. This implies the from eol to the end of file doesn't contain
626
 
                // and used records.
627
 
                
628
 
 
629
 
#ifdef DEBUG    
630
 
                uint64_t old_size = txn_MaxRecords;
631
 
#endif          
632
 
                if (txn_MaxRecords > txn_ReqestedMaxRecords) { // Shrink the log
633
 
                        uint64_t max_resize = txn_MaxRecords - txn_EOL;
634
 
                        
635
 
                        if ( txn_Start == txn_EOL)
636
 
                                max_resize = txn_MaxRecords;
637
 
                        else {
638
 
                                max_resize = txn_MaxRecords - txn_EOL;
639
 
                                if (!txn_Start) // If start is at '0' then the EOL cannot be wrapped.
640
 
                                        max_resize--;
641
 
                        }
642
 
                        
643
 
                                
644
 
                        if (max_resize > (txn_MaxRecords - txn_ReqestedMaxRecords))
645
 
                                max_resize = txn_MaxRecords - txn_ReqestedMaxRecords;
646
 
                                                        
647
 
                        txn_MaxRecords -=       max_resize;
648
 
                } else
649
 
                        txn_MaxRecords = txn_ReqestedMaxRecords; // Grow the log
650
 
 
651
 
#ifdef DEBUG                    
652
 
                char buffer[CS_EXC_MESSAGE_SIZE];               
653
 
                snprintf(buffer, CS_EXC_MESSAGE_SIZE, "Resizing the Transaction log from %"PRIu64" to %"PRIu64" \n",  old_size, txn_MaxRecords);
654
 
                CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, buffer);
655
 
#endif
656
 
 
657
 
                CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
658
 
                
659
 
                txn_File->setEOF(txn_MaxRecords * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec));
660
 
                txn_File->write(&(txn_DiskHeader.th_list_size_8), offsetof(MSDiskTransHeadRec, th_list_size_8), 8);
661
 
                
662
 
                if (txn_Start == txn_EOL) {
663
 
                        txn_Start = 0;
664
 
                        txn_EOL = 0;
665
 
                } else if (txn_MaxRecords == txn_EOL) {
666
 
                        txn_EOL = 0;
667
 
                }
668
 
                
669
 
                txn_ResetEOL();
670
 
                                
671
 
        }       
672
 
        unlock_(this);
673
 
        
674
 
        exit_();
675
 
}
676
 
 
677
 
void  MSTrans::txn_ResetEOL()
678
 
{
679
 
        enter_();
680
 
        
681
 
        txn_EOLCheckPoint = txn_MaxCheckPoint;
682
 
        txn_StartCheckPoint = txn_MaxCheckPoint;
683
 
        
684
 
        if (!txn_EOL)
685
 
                txn_Checksum++;
686
 
        CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
687
 
        CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
688
 
        CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
689
 
        txn_File->write(&(txn_DiskHeader.th_start_8), 
690
 
                                                offsetof(MSDiskTransHeadRec, th_start_8), 
691
 
                                                sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
692
 
CRASH_POINT(5);
693
 
        txn_File->flush();
694
 
        txn_File->sync();
695
 
CRASH_POINT(10);
696
 
                
697
 
        exit_();
698
 
}
699
 
 
700
 
#define PRINT_TRANS(tid, a, t) 
701
 
 
702
 
#ifndef PRINT_TRANS
703
 
#define PRINT_TRANS(tid, a, t) printTrans(tid, a, t)
704
 
static void printTrans(uint32_t tid, bool autocommit, MS_Txn type)
705
 
{
706
 
        const char *type_name = "???";
707
 
        
708
 
        switch (type) {
709
 
                case MS_RollBackTxn:
710
 
                        type_name = "Rollback";
711
 
                        break;
712
 
                case MS_PartialRollBackTxn:
713
 
                        type_name = "PartialRollBack";
714
 
                        break;
715
 
                case MS_CommitTxn:
716
 
                        type_name = "Commit";
717
 
                        break;
718
 
                case MS_ReferenceTxn:
719
 
                        type_name = "Reference";
720
 
                        break;
721
 
                case MS_DereferenceTxn:
722
 
                        type_name = "Dereference";
723
 
                        break;
724
 
                case MS_RecoveredTxn:
725
 
                        type_name = "Recovered";
726
 
                        break;
727
 
        }
728
 
        
729
 
        fprintf(stderr, "MSTrans::txn_LogTransaction(%d, autocommit = %s, %s)\n", tid, (autocommit)?"On":"Off", type_name);
730
 
 
731
 
}
732
 
#endif
733
 
 
734
 
void MSTrans::txn_LogTransaction(MS_Txn type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
735
 
{
736
 
        enter_();
737
 
        
738
 
        lock_(this);
739
 
        if (!self->myTID) {
740
 
                switch (type) {
741
 
                        case MS_RollBackTxn:
742
 
                        case MS_PartialRollBackTxn:
743
 
                        case MS_CommitTxn: {
744
 
                                unlock_(this);
745
 
                                exit_();;
746
 
                        }
747
 
                        case MS_ReferenceTxn:
748
 
                        case MS_DereferenceTxn:
749
 
                        case MS_RecoveredTxn:
750
 
                                break;
751
 
                }
752
 
                txn_MaxTID++;
753
 
                self->myTID = txn_MaxTID;
754
 
                self->myTransRef = TRANS_CACHE_NEW_REF;
755
 
                self->myStartTxn = true;
756
 
        }
757
 
 
758
 
        PRINT_TRANS(self->myTID, autocommit, type);
759
 
        
760
 
        txn_AddTransaction(type, autocommit, db_id, tab_id, blob_id, blob_ref_id);
761
 
        if (autocommit || TRANS_TYPE_IS_TERMINATED(type))
762
 
                txn_NewTransaction();
763
 
                
764
 
        unlock_(this);
765
 
        
766
 
        exit_();
767
 
}
768
 
 
769
 
void  MSTrans::txn_AddTransaction(uint8_t tran_type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
770
 
{
771
 
        MSTransRec rec = {0,0,0,0,0,0,0}; // This must be set to zero so that the checksum will be valid. 
772
 
        MSDiskTransRec drec;
773
 
        uint64_t new_offset = txn_EOL;
774
 
        bool do_flush = true;
775
 
 
776
 
        enter_();
777
 
 
778
 
        
779
 
        // Check that the log is not already full.
780
 
        if (txn_IsFull()) {             
781
 
                if (!txn_HaveOverflow) { // The first overflow record: update the header.
782
 
                        CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_OVERFLOW);
783
 
                        txn_File->write(&(txn_DiskHeader.th_overflow_1), offsetof(MSDiskTransHeadRec, th_overflow_1), 1);
784
 
 
785
 
                        CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
786
 
                        CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
787
 
                        txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
788
 
 
789
 
                        txn_File->flush();
790
 
                        txn_File->sync();
791
 
                        txn_HaveOverflow = true;
792
 
                        txn_OverflowCount++;
793
 
                        txn_Overflow =  txn_MaxRecords;         
794
 
                }
795
 
                
796
 
                new_offset = txn_Overflow;
797
 
        }
798
 
                
799
 
        rec.tr_id = self->myTID ;
800
 
        rec.tr_type = tran_type;
801
 
        rec.tr_db_id = db_id;
802
 
        rec.tr_tab_id = tab_id;
803
 
        rec.tr_blob_id = blob_id;
804
 
        rec.tr_blob_ref_id = blob_ref_id;
805
 
        
806
 
        if (self->myStartTxn) {
807
 
                TRANS_SET_START(rec.tr_type);
808
 
                self->myStartTxn = false;
809
 
        }
810
 
                
811
 
        if (autocommit) {
812
 
                TRANS_SET_AUTOCOMMIT(rec.tr_type);
813
 
        }
814
 
                
815
 
#ifdef TRACE_ALL
816
 
if (txn_debug_log){
817
 
char *ttype, *cmt;
818
 
switch (TRANS_TYPE(rec.tr_type)) {
819
 
        case MS_ReferenceTxn:
820
 
                ttype = "+";
821
 
                break;
822
 
        case MS_DereferenceTxn:
823
 
                ttype = "-";
824
 
                break;
825
 
        case MS_RollBackTxn:
826
 
                ttype = "rb";
827
 
                rec.tr_blob_ref_id = 0;
828
 
                break;
829
 
        case MS_RecoveredTxn:
830
 
                ttype = "rcov";
831
 
                rec.tr_blob_ref_id = 0;
832
 
                break;
833
 
        default:
834
 
                ttype = "???";
835
 
}
836
 
 
837
 
if (TRANS_IS_TERMINATED(rec.tr_type))
838
 
        cmt = "c";
839
 
else
840
 
        cmt = "";
841
 
                        
842
 
fprintf(txn_debug_log, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" %"PRIu64" %"PRIu64"  %"PRIu64" %d\n", self->myTID, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id, txn_Start, txn_EOL, new_offset, txn_HaveOverflow);
843
 
}
844
 
#endif
845
 
 
846
 
        rec.tr_check = txn_Checksum; 
847
 
        
848
 
        // Calculate the records checksum.
849
 
        rec.tr_check = checksum((uint8_t*)&rec, sizeof(rec));
850
 
        
851
 
        // Write the record to disk.
852
 
        SET_DISK_TRANSREC(&drec, &rec);
853
 
#ifdef CRASH_TEST
854
 
 
855
 
        if (trans_test_crash_point == 9) { // do a partial write before crashing
856
 
                txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec)/2 );
857
 
                CRASH_POINT(9);
858
 
        } else
859
 
                txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
860
 
#else   
861
 
        txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
862
 
#endif
863
 
CRASH_POINT(3);
864
 
        // There is no need to sync if the transaction is still running.
865
 
        if (TRANS_IS_TERMINATED(tran_type)) {
866
 
                CRASH_POINT(4); // This crash will result in a verify error because the txn was committed to the log but not the database.
867
 
                txn_File->flush();
868
 
                txn_File->sync();
869
 
                do_flush = false;
870
 
        }
871
 
        
872
 
        if (!txn_HaveOverflow) { // No need to update the header if overflowing.
873
 
                uint64_t rec_offset = txn_EOL;
874
 
                
875
 
                txn_EOL = new_offset;
876
 
                txn_EOL++;
877
 
                
878
 
                if (txn_EOL == txn_MaxRecords) {
879
 
                        // The eol has rolled over.
880
 
                        txn_EOL = 0;            
881
 
                }
882
 
 
883
 
                txn_EOLCheckPoint--;
884
 
                if ((!txn_EOLCheckPoint) || !txn_EOL) {
885
 
                        
886
 
                        // Flush the previouse write if required before updating the header.
887
 
                        // This is just in case it crashes during the sync to make sure that the
888
 
                        // header information is correct for the data on disk. If the crash occurred
889
 
                        // between writing the header and the record the header on disk would be wrong.
890
 
                        if (do_flush) {
891
 
                                txn_File->flush();
892
 
                                txn_File->sync();
893
 
                        }
894
 
                        
895
 
                        txn_ResetEOL();
896
 
                }
897
 
                
898
 
                txn_TransCache->tc_AddRec(rec_offset, &rec, self->myTransRef);
899
 
                
900
 
                if (txn_GetNumRecords() > txn_HighWaterMark)
901
 
                        txn_HighWaterMark = txn_GetNumRecords();
902
 
                        
903
 
        } else { // Ovewrflow
904
 
                txn_TransCache->tc_AddRec(txn_Overflow, &rec, self->myTransRef);
905
 
                txn_Overflow++;
906
 
                if (txn_Overflow > txn_HighWaterMark)
907
 
                        txn_HighWaterMark = txn_Overflow;
908
 
        }
909
 
        
910
 
        ASSERT(txn_EOL < txn_MaxRecords);
911
 
        ASSERT(txn_Start < txn_MaxRecords);
912
 
        exit_();
913
 
}
914
 
 
915
 
uint64_t MSTrans::txn_GetSize()         
916
 
{
917
 
        return sizeof(MSDiskTransHeadRec) + txn_MaxRecords * sizeof(MSDiskTransRec);
918
 
}
919
 
 
920
 
//---------------
921
 
void MSTrans::txn_NewTransaction()
922
 
923
 
        enter_();
924
 
 
925
 
        self->myTID = 0;        // This will be assigned when the first record is written.
926
 
        
927
 
        exit_();
928
 
}
929
 
 
930
 
//---------------
931
 
void MSTrans::txn_PerformIdleTasks()
932
 
{
933
 
        enter_();
934
 
        
935
 
        if (txn_TransCache->tc_ShoulReloadCache()) {
936
 
                txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload());
937
 
                txn_TransCache->tc_CompleteCacheReload();
938
 
                exit_();
939
 
        }
940
 
        
941
 
        // During backup the reader is suspended. This may need to be changed
942
 
        // if we decide to actually do something here.
943
 
        txn_reader->suspendedWait(1000);
944
 
        exit_();
945
 
}
946
 
 
947
 
//---------------
948
 
void MSTrans::txn_ResetReadPosition(uint64_t pos)
949
 
{       
950
 
        bool rollover = (pos < txn_Start);
951
 
        enter_();
952
 
        
953
 
        if (pos >= txn_MaxRecords) { // Start of overflow
954
 
                lock_(this);
955
 
                
956
 
                // Overflow has occurred and the circular list is now empty 
957
 
                // so expand the list to include the overflow and 
958
 
                // reset txn_Start and txn_EOL
959
 
                txn_Start = txn_MaxRecords;
960
 
                txn_MaxRecords = txn_Overflow;
961
 
                txn_EOL = 0;
962
 
                txn_HaveOverflow = false;
963
 
                txn_Overflow = 0;
964
 
                
965
 
                CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
966
 
                CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
967
 
                txn_File->write(&(txn_DiskHeader.th_overflow_1),        offsetof(MSDiskTransHeadRec, th_overflow_1),    1);
968
 
                txn_File->write(&(txn_DiskHeader.th_list_size_8),       offsetof(MSDiskTransHeadRec, th_list_size_8),   8);
969
 
                                
970
 
                txn_ResetEOL();
971
 
                
972
 
                unlock_(this);
973
 
        } else
974
 
                txn_Start = pos;        
975
 
        
976
 
        ASSERT(txn_Start <= txn_MaxRecords);
977
 
        
978
 
        if (!rollover)
979
 
                txn_StartCheckPoint -= (pos - txn_Start);
980
 
        
981
 
        // Flush the header if the read position has rolled over or it is time. 
982
 
        if ( rollover || (txn_StartCheckPoint <=0)) {
983
 
                lock_(this);
984
 
                CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
985
 
                CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
986
 
                txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
987
 
CRASH_POINT(5);
988
 
                txn_File->flush();
989
 
                txn_File->sync();
990
 
                txn_StartCheckPoint = txn_MaxCheckPoint;
991
 
                unlock_(this);
992
 
        }
993
 
        
994
 
CRASH_POINT(6);
995
 
        
996
 
        if (TRANS_CAN_RESIZE) 
997
 
                txn_ResizeLog();
998
 
                
999
 
        exit_();
1000
 
}
1001
 
//---------------
1002
 
bool MSTrans::txn_haveNextTransaction() 
1003
 
{
1004
 
        bool terminated = false;
1005
 
        TRef ref;
1006
 
        
1007
 
        txn_TransCache->tc_GetTransaction(&ref, &terminated);
1008
 
        
1009
 
        return terminated;
1010
 
}
1011
 
 
1012
 
//---------------
1013
 
void MSTrans::txn_GetNextTransaction(MSTransPtr tran, MS_TxnState *state)
1014
 
{
1015
 
        bool terminated;
1016
 
        uint64_t log_position;
1017
 
        enter_();
1018
 
        
1019
 
        ASSERT(txn_reader == self);
1020
 
        lock_(txn_reader);
1021
 
        
1022
 
        do {
1023
 
                // Get the next completed transaction.
1024
 
                // this will suspend the current thread, which is assumed
1025
 
                // to be the log reader, until one is available.
1026
 
                while ((!txn_IsTxnValid) && !self->myMustQuit) {
1027
 
 
1028
 
                        // wait until backup has completed.
1029
 
                        while (txn_Doingbackup && !self->myMustQuit)
1030
 
                                txn_PerformIdleTasks();
1031
 
        
1032
 
                        if (txn_TransCache->tc_GetTransaction(&txn_CurrentTxn, &terminated) && terminated) {
1033
 
                                txn_IsTxnValid = true;
1034
 
                                txn_TxnIndex = 0;
1035
 
                        } else
1036
 
                                txn_PerformIdleTasks();
1037
 
                }
1038
 
                
1039
 
                if (self->myMustQuit)
1040
 
                        break;
1041
 
                        
1042
 
                if (txn_TransCache->tc_GetRecAt(txn_CurrentTxn, txn_TxnIndex++, tran, state)) 
1043
 
                        break;
1044
 
                        
1045
 
CRASH_POINT(7);
1046
 
                txn_TransCache->tc_FreeTransaction(txn_CurrentTxn);
1047
 
CRASH_POINT(8);
1048
 
                if (txn_TransCache->tc_GetTransactionStartPosition(&log_position)) {
1049
 
                        txn_ResetReadPosition(log_position);
1050
 
                }else{
1051
 
                        if (txn_TransCache->tc_ShoulReloadCache()) {
1052
 
                                uint64_t pos = txn_TransCache->tc_StartCacheReload();
1053
 
                                txn_ResetReadPosition(pos);
1054
 
                                txn_LoadTransactionCache(pos);
1055
 
                                txn_TransCache->tc_CompleteCacheReload();
1056
 
                        } else {
1057
 
                                // Lock the object to prevent writer thread updates while I check again.
1058
 
                                // This is to ensure that txn_EOL is not changed between the call to
1059
 
                                // tc_GetTransactionStartPosition() and setting the read position.
1060
 
                                lock_(this);
1061
 
                                if (txn_TransCache->tc_GetTransactionStartPosition(&log_position)) 
1062
 
                                        txn_ResetReadPosition(log_position);
1063
 
                                else
1064
 
                                        txn_ResetReadPosition(txn_EOL);
1065
 
                                unlock_(this);
1066
 
                        }
1067
 
                }
1068
 
                
1069
 
                txn_IsTxnValid = false;
1070
 
                        
1071
 
        } while (1);
1072
 
        
1073
 
        unlock_(txn_reader);
1074
 
        exit_();
1075
 
}
1076
 
 
1077
 
 
1078
 
void MSTrans::txn_GetStats(MSTransStatsPtr stats)
1079
 
{
1080
 
        
1081
 
        if (txn_HaveOverflow) {
1082
 
                stats->ts_IsOverflowing = true;
1083
 
                stats->ts_LogSize = txn_Overflow;
1084
 
        } else {
1085
 
                stats->ts_IsOverflowing = false;
1086
 
                stats->ts_LogSize = txn_GetNumRecords();
1087
 
        }
1088
 
        stats->ts_PercentFull = (stats->ts_LogSize * 100) / CS_GET_DISK_8(txn_DiskHeader.th_requested_list_size_8);
1089
 
 
1090
 
        stats->ts_MaxSize = txn_HighWaterMark;
1091
 
        stats->ts_OverflowCount = txn_OverflowCount;
1092
 
        
1093
 
        stats->ts_TransCacheSize = txn_TransCache->tc_GetCacheUsed();
1094
 
        stats->ts_PercentTransCacheUsed = txn_TransCache->tc_GetPercentCacheUsed();
1095
 
        stats->ts_PercentCacheHit = txn_TransCache->tc_GetPercentCacheHit();
1096
 
}
1097
 
 
1098
 
void MSTrans::txn_SetCacheSize(uint32_t new_size)
1099
 
{
1100
 
        enter_();
1101
 
        // Important lock order. Writer threads never lock the reader but the reader
1102
 
        // may lock this object so always lock the reader first.
1103
 
        lock_(txn_reader);
1104
 
        lock_(this);
1105
 
 
1106
 
        CS_SET_DISK_4(txn_DiskHeader.th_requested_cache_size_4, new_size);
1107
 
        
1108
 
        txn_File->write(&(txn_DiskHeader.th_requested_cache_size_4), offsetof(MSDiskTransHeadRec, th_requested_cache_size_4), 4);
1109
 
        txn_File->flush();
1110
 
        txn_File->sync();
1111
 
 
1112
 
        txn_TransCache->tc_SetSize(new_size);
1113
 
 
1114
 
        unlock_(this);
1115
 
        unlock_(txn_reader);
1116
 
        exit_();
1117
 
}
1118
 
 
1119
 
void MSTrans::txn_SetLogSize(uint64_t new_size)
1120
 
{
1121
 
        enter_();
1122
 
        
1123
 
        // Important lock order. Writer threads never lock the reader but the reader
1124
 
        // may lock this object so always lock the reader first.
1125
 
        lock_(txn_reader);
1126
 
        lock_(this);
1127
 
        
1128
 
        txn_ReqestedMaxRecords = (new_size - sizeof(MSDiskTransHeadRec)) / sizeof(MSDiskTransRec);
1129
 
        
1130
 
        if (txn_ReqestedMaxRecords < 10)
1131
 
                txn_ReqestedMaxRecords = 10;
1132
 
        
1133
 
        CS_SET_DISK_8(txn_DiskHeader.th_requested_list_size_8, txn_ReqestedMaxRecords);
1134
 
        
1135
 
        txn_File->write(&(txn_DiskHeader.th_requested_list_size_8), offsetof(MSDiskTransHeadRec, th_requested_list_size_8), 8);
1136
 
        txn_File->flush();
1137
 
        txn_File->sync();
1138
 
        
1139
 
        unlock_(this);
1140
 
        unlock_(txn_reader);
1141
 
        
1142
 
        exit_();
1143
 
}
1144
 
 
1145
 
// A helper class for resetting database IDs in the transaction log.
1146
 
class DBSearchTXNLog : ReadTXNLog {
1147
 
        public:
1148
 
        DBSearchTXNLog(MSTrans *log): ReadTXNLog(log), sdb_db_id(0), sdb_isDirty(false) {}
1149
 
        
1150
 
        uint32_t sdb_db_id; 
1151
 
        bool sdb_isDirty;
1152
 
        
1153
 
        virtual bool rl_CanContinue() { return true;}
1154
 
        virtual void rl_Load(uint64_t log_position, MSTransPtr rec) 
1155
 
        {
1156
 
                if  (rec->tr_db_id == sdb_db_id) {
1157
 
                        sdb_isDirty = true;
1158
 
                        rec->tr_db_id = 0;
1159
 
                        rl_Store(log_position, rec);
1160
 
                } 
1161
 
        }
1162
 
        
1163
 
        void SetDataBaseIDToZero(uint32_t db_id)
1164
 
        {
1165
 
                sdb_db_id = db_id;
1166
 
                rl_ReadLog(rl_log->txn_GetStartPosition(), false);
1167
 
                if (sdb_isDirty)
1168
 
                        rl_Flush();
1169
 
        }
1170
 
};
1171
 
 
1172
 
// Dropping the database from the transaction log just involves
1173
 
// scanning the log and setting the database id of any transactions 
1174
 
// involving the dropped database to zero.
1175
 
void MSTrans::txn_dropDatabase(uint32_t db_id)
1176
 
{
1177
 
        enter_();
1178
 
        
1179
 
        // Important lock order. Writer threads never lock the reader but the reader
1180
 
        // may lock this object so always lock the reader first.
1181
 
        lock_(txn_reader);
1182
 
        lock_(this);
1183
 
        
1184
 
        // Clear any transaction records in the cache for the dropped database;
1185
 
        txn_TransCache->tc_dropDatabase(db_id);
1186
 
        
1187
 
        // Scan the log setting the database ID for any record belonging to the
1188
 
        // dropped database to zero. 
1189
 
        DBSearchTXNLog searchLog(this);
1190
 
        
1191
 
        searchLog.SetDataBaseIDToZero(db_id);
1192
 
                
1193
 
        unlock_(this);
1194
 
        unlock_(txn_reader);
1195
 
        exit_();
1196
 
}
1197
 
 
1198
 
#ifdef DEBUG    
1199
 
void MSTrans::txn_DumpLog(const char *file)
1200
 
{
1201
 
        size_t  size, read_start = 0;
1202
 
        FILE *fptr;
1203
 
        enter_();
1204
 
        
1205
 
        fptr = fopen(file, "w+");
1206
 
        if (!fptr) {
1207
 
                perror(file);
1208
 
                return;
1209
 
        }
1210
 
        
1211
 
        if (txn_Overflow)
1212
 
                size = txn_Overflow;
1213
 
        else
1214
 
                size = txn_MaxRecords;
1215
 
        
1216
 
        // Dump all the records
1217
 
        while (size) {
1218
 
                MSDiskTransRec diskRecords[1000];
1219
 
                uint32_t read_size;
1220
 
                off64_t offset;
1221
 
                
1222
 
                if (size > 1000) 
1223
 
                        read_size = 1000 ;
1224
 
                else
1225
 
                        read_size = size ;
1226
 
                
1227
 
                // Read the next block of records.
1228
 
                offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);              
1229
 
                txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
1230
 
                
1231
 
                for (uint32_t i = 0; i < read_size; i++) {
1232
 
                        const char *ttype, *cmt;
1233
 
                        MSTransRec rec;
1234
 
                        MSDiskTransPtr drec = diskRecords + i;
1235
 
                        GET_DISK_TRANSREC(&rec, drec);
1236
 
                        
1237
 
                        switch (TRANS_TYPE(rec.tr_type)) {
1238
 
                                case MS_ReferenceTxn:
1239
 
                                        ttype = "+";
1240
 
                                        break;
1241
 
                                case MS_DereferenceTxn:
1242
 
                                        ttype = "-";
1243
 
                                        break;
1244
 
                                case MS_RollBackTxn:
1245
 
                                        ttype = "rb";
1246
 
                                        rec.tr_blob_ref_id = 0;
1247
 
                                        break;
1248
 
                                case MS_RecoveredTxn:
1249
 
                                        ttype = "rcov";
1250
 
                                        rec.tr_blob_ref_id = 0;
1251
 
                                        break;
1252
 
                                default:
1253
 
                                        ttype = "???";
1254
 
                        }
1255
 
                        
1256
 
                        if (TRANS_IS_TERMINATED(rec.tr_type))
1257
 
                                cmt = "c";
1258
 
                        else
1259
 
                                cmt = "";
1260
 
                        
1261
 
                        
1262
 
                        fprintf(fptr, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" \t %s %s %s\n", rec.tr_id, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id, 
1263
 
                                ((read_start + i) == txn_Start) ? "START":"",
1264
 
                                ((read_start + i) == txn_EOL) ? "EOL":"",
1265
 
                                ((read_start + i) == txn_MaxRecords) ? "OverFlow":""
1266
 
                                );
1267
 
                }
1268
 
                
1269
 
                size -= read_size;
1270
 
                read_start += read_size;
1271
 
        }
1272
 
        fclose(fptr);
1273
 
        exit_();
1274
 
}       
1275
 
 
1276
 
#endif
1277