~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/TransLog_ms.cc

Added the PBMS daemon plugin.

(Augen zu und durch!)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Barry Leslie
 
20
 *
 
21
 * 2009-06-09
 
22
 *
 
23
 * H&G2JCtL
 
24
 *
 
25
 * PBMS transaction handling.
 
26
 *
 
27
 * PBMS uses 1 circular transaction log. All BLOB reference operations are written to this log
 
28
 * and are applied to the repository when committed. There is 1 thread dedicated to reading the 
 
29
 * transaction log and applying the changes. During an engine level backup this thread is suspended 
 
30
 * so that no transactions will be applied to the repository files as they are backed up.
 
31
 *
 
32
 */
 
33
#include "CSConfig.h"
 
34
 
 
35
#include <stdlib.h>
 
36
#include <inttypes.h>
 
37
 
 
38
#include "CSGlobal.h"
 
39
#include "CSStrUtil.h"
 
40
#include "CSStorage.h"
 
41
 
 
42
#include "TransLog_ms.h"
 
43
#include "TransCache_ms.h"
 
44
 
 
45
#ifdef CRASH_TEST
 
46
uint32_t        trans_test_crash_point;
 
47
#define CRASH_POINT(p) { if (p == trans_test_crash_point) { char *ptr = NULL; printf("Crash on demand at: %s(%d), start: %"PRIu64", eol: %"PRIu64"\n", __FILE__, __LINE__, txn_Start, txn_EOL); *ptr = 88;}}
 
48
#else
 
49
#define CRASH_POINT(p)
 
50
#endif
 
51
 
 
52
#define MS_TRANS_LOG_MAGIC                      0xA6E7D7B3
 
53
#define MS_TRANS_LOG_VERSION            1
 
54
#define MS_TRANS_LOG_RECOVERED          0XA1
 
55
#define MS_TRANS_LOG_NOT_RECOVERED      0XA2
 
56
#define MS_TRANS_NO_OVERFLOW            0XB1
 
57
#define MS_TRANS_OVERFLOW                       0XB2
 
58
 
 
59
#define DFLT_TRANS_CHECKPOINT_THRESHOLD 1024
 
60
 
 
61
#define DFLT_TRANS_LOG_LIST_SIZE        (1024 * 10)
 
62
#define DFLT_TRANS_CACHE_SIZE           (500)
 
63
 
 
64
#define TRANS_CAN_RESIZE        ((txn_MaxRecords != txn_ReqestedMaxRecords) && (txn_EOL >= txn_Start) && !txn_HaveOverflow)
 
65
 
 
66
typedef struct MSDiskTrans {
 
67
        CSDiskValue4    dtr_id_4;                       // The transaction ID
 
68
        CSDiskValue1    dtr_type_1;                     // The transaction type. If the first bit is set then the transaction is an autocommit.
 
69
        CSDiskValue1    dtr_check_1;            // The trransaction record checksum.
 
70
        CSDiskValue4    dtr_db_id_4;            // The database ID for the operation.
 
71
        CSDiskValue4    dtr_tab_id_4;           // The table ID for the operation.
 
72
        CSDiskValue8    dtr_blob_id_8;          // The blob ID for the operation.
 
73
        CSDiskValue8    dtr_blob_ref_id_8;      // The blob reference id.
 
74
} MSDiskTransRec, *MSDiskTransPtr;
 
75
 
 
76
#define SET_DISK_TRANSREC(d, s) { \
 
77
        CS_SET_DISK_4((d)->dtr_id_4, (s)->tr_id);\
 
78
        CS_SET_DISK_1((d)->dtr_type_1, (s)->tr_type);\
 
79
        CS_SET_DISK_1((d)->dtr_check_1, (s)->tr_check);\
 
80
        CS_SET_DISK_4((d)->dtr_db_id_4, (s)->tr_db_id);\
 
81
        CS_SET_DISK_4((d)->dtr_tab_id_4, (s)->tr_tab_id);\
 
82
        CS_SET_DISK_8((d)->dtr_blob_id_8, (s)->tr_blob_id);\
 
83
        CS_SET_DISK_8((d)->dtr_blob_ref_id_8, (s)->tr_blob_ref_id);\
 
84
}
 
85
 
 
86
#define GET_DISK_TRANSREC(s, d) { \
 
87
        (s)->tr_id = CS_GET_DISK_4((d)->dtr_id_4);\
 
88
        (s)->tr_type = CS_GET_DISK_1((d)->dtr_type_1);\
 
89
        (s)->tr_check = CS_GET_DISK_1((d)->dtr_check_1);\
 
90
        (s)->tr_db_id = CS_GET_DISK_4((d)->dtr_db_id_4);\
 
91
        (s)->tr_tab_id = CS_GET_DISK_4((d)->dtr_tab_id_4);\
 
92
        (s)->tr_blob_id = CS_GET_DISK_8((d)->dtr_blob_id_8);\
 
93
        (s)->tr_blob_ref_id = CS_GET_DISK_8((d)->dtr_blob_ref_id_8);\
 
94
}
 
95
 
 
96
static uint8_t checksum(uint8_t *data, size_t len)
 
97
{
 
98
        register uint32_t       sum = 0, g;
 
99
        uint8_t                         *chk;
 
100
 
 
101
        chk = data + len - 1;
 
102
        while (chk > data) {
 
103
                sum = (sum << 4) + *chk;
 
104
                if ((g = sum & 0xF0000000)) {
 
105
                        sum = sum ^ (g >> 24);
 
106
                        sum = sum ^ g;
 
107
                }
 
108
                chk--;
 
109
        }
 
110
        return (uint8_t) (sum ^ (sum >> 24) ^ (sum >> 16) ^ (sum >> 8));
 
111
}
 
112
 
 
113
MSTrans::MSTrans() : 
 
114
        CSSharedRefObject(),
 
115
        txn_TransCache(NULL),
 
116
        txn_IsTxnValid(false),
 
117
        txn_HaveOverflow(false),
 
118
        txn_OverflowCount(0),
 
119
        txn_HighWaterMark(0),
 
120
        txn_Overflow(0),
 
121
        txn_reader(NULL),
 
122
        txn_StartCheckPoint(0),
 
123
        txn_EOLCheckPoint(0),
 
124
        txn_File(NULL),
 
125
        txn_MaxTID(0),
 
126
        txn_Recovered(false),
 
127
        txn_Doingbackup(false)
 
128
{
 
129
}
 
130
 
 
131
MSTrans::~MSTrans()
 
132
{
 
133
        txn_Close();
 
134
        if (txn_TransCache) 
 
135
                txn_TransCache->release();
 
136
                
 
137
}       
 
138
 
 
139
void MSTrans::txn_Close()
 
140
{
 
141
        
 
142
        if (txn_File) {         
 
143
                // Set the header to indicate that the log has not been closed properly. 
 
144
                CS_SET_DISK_4(txn_DiskHeader.th_next_txn_id_4, txn_MaxTID);
 
145
                txn_File->write(&(txn_DiskHeader.th_next_txn_id_4), offsetof(MSDiskTransHeadRec, th_next_txn_id_4), 4 );
 
146
 
 
147
                CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
 
148
                CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
 
149
                CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
 
150
                txn_File->write(&(txn_DiskHeader.th_start_8), 
 
151
                                                        offsetof(MSDiskTransHeadRec, th_start_8), 
 
152
                                                        sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
 
153
CRASH_POINT(1);
 
154
                txn_File->flush();
 
155
                txn_File->sync();
 
156
 
 
157
                if (txn_Recovered) {
 
158
                        // Write the recovered flag seperately just incase of a crash during the write operation.
 
159
                        CS_SET_DISK_1(txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);           
 
160
                        txn_File->write(&(txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1 );
 
161
                        txn_File->flush();
 
162
                        txn_File->sync();
 
163
                }
 
164
                
 
165
                txn_File->close();
 
166
                txn_File->release();
 
167
                txn_File = NULL;                
 
168
        }
 
169
}
 
170
void MSTrans::txn_SetFile(CSFile *tr_file)
 
171
{
 
172
        txn_File = tr_file;
 
173
}
 
174
 
 
175
//#define TRACE_ALL
 
176
#ifdef TRACE_ALL
 
177
static FILE *txn_debug_log;
 
178
#endif
 
179
 
 
180
MSTrans *MSTrans::txn_NewMSTrans(const char *log_path, bool dump_log)
 
181
{
 
182
        MSTrans *trans = NULL;
 
183
        CSPath *path = NULL;
 
184
        uint64_t log_size;
 
185
        enter_();
 
186
 
 
187
        new_(trans, MSTrans());
 
188
        push_(trans);
 
189
 
 
190
        path = CSPath::newPath(log_path);
 
191
        push_(path);    
 
192
                
 
193
try_again:
 
194
        
 
195
        
 
196
        if (!path->exists()) { // Create the transaction log.   
 
197
                CSFile *tr_file = path->createFile(CSFile::CREATE);
 
198
                push_(tr_file);
 
199
                
 
200
                log_size = DFLT_TRANS_LOG_LIST_SIZE * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
 
201
 
 
202
                // Preallocate the log space and initialize it.
 
203
                MSDiskTransRec recs[1024] = {0};
 
204
                off_t offset = sizeof(MSDiskTransHeadRec);
 
205
                uint64_t num_records = DFLT_TRANS_LOG_LIST_SIZE;
 
206
                size_t size;
 
207
                
 
208
                while (num_records) {
 
209
                        if (num_records < 1024)
 
210
                                size = num_records;
 
211
                        else
 
212
                                size = 1024;
 
213
                        tr_file->write(recs, offset, size * sizeof(MSDiskTransRec));
 
214
                        offset += size * sizeof(MSDiskTransRec);
 
215
                        num_records -= size;
 
216
                }
 
217
 
 
218
                trans->txn_MaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
 
219
                trans->txn_ReqestedMaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
 
220
                trans->txn_MaxCheckPoint = DFLT_TRANS_CHECKPOINT_THRESHOLD;
 
221
                trans->txn_MaxTID = 1;
 
222
 
 
223
                // Initialize the log header.
 
224
                CS_SET_DISK_4(trans->txn_DiskHeader.th_magic_4, MS_TRANS_LOG_MAGIC);
 
225
                CS_SET_DISK_2(trans->txn_DiskHeader.th_version_2, MS_TRANS_LOG_VERSION);
 
226
                
 
227
                CS_SET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4, trans->txn_MaxTID);
 
228
 
 
229
                CS_SET_DISK_2(trans->txn_DiskHeader.th_check_point_2, trans->txn_MaxCheckPoint);
 
230
 
 
231
                CS_SET_DISK_8(trans->txn_DiskHeader.th_list_size_8, trans->txn_MaxRecords);
 
232
                CS_SET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8, trans->txn_ReqestedMaxRecords);
 
233
 
 
234
                CS_SET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4, DFLT_TRANS_CACHE_SIZE);
 
235
                
 
236
                CS_SET_DISK_8(trans->txn_DiskHeader.th_start_8, 0);
 
237
                CS_SET_DISK_8(trans->txn_DiskHeader.th_eol_8, 0);
 
238
                
 
239
                CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);
 
240
                CS_SET_DISK_1(trans->txn_DiskHeader.th_checksum_1, 1);
 
241
                CS_SET_DISK_1(trans->txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
 
242
                
 
243
                tr_file->write(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec));
 
244
                pop_(tr_file);
 
245
                trans->txn_SetFile(tr_file);
 
246
                
 
247
                trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
 
248
                
 
249
                trans->txn_TransCache = MSTransCache::newMSTransCache(DFLT_TRANS_CACHE_SIZE);
 
250
        } else { // The transaction log already exists
 
251
                bool overflow, recovered;
 
252
                
 
253
                CSFile *tr_file = path->createFile(CSFile::DEFAULT); // Open read/write
 
254
                push_(tr_file);
 
255
                
 
256
                // Read the log header:
 
257
                if (tr_file->read(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec), 0) < sizeof(MSDiskTransHeadRec)) {
 
258
                        release_(tr_file);
 
259
                        path->removeFile();
 
260
                        goto try_again;
 
261
                }
 
262
                
 
263
                // check the log header:                
 
264
                if (CS_GET_DISK_4(trans->txn_DiskHeader.th_magic_4) != MS_TRANS_LOG_MAGIC)
 
265
                        CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_HEADER_MAGIC);
 
266
                
 
267
                if (CS_GET_DISK_2(trans->txn_DiskHeader.th_version_2) != MS_TRANS_LOG_VERSION)
 
268
                        CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_VERSION_TOO_NEW);
 
269
                        
 
270
                //----
 
271
                if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_NO_OVERFLOW) 
 
272
                        overflow = false;
 
273
                else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_OVERFLOW) 
 
274
                        overflow = true;
 
275
                else 
 
276
                        CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
 
277
                
 
278
                //----
 
279
                if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_NOT_RECOVERED) 
 
280
                        recovered = false;
 
281
                else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_RECOVERED) 
 
282
                        recovered = true;
 
283
                else 
 
284
                        CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
 
285
 
 
286
                // Check that the log is the expected size.
 
287
                log_size = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8) * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
 
288
                
 
289
                if ((log_size > tr_file->getEOF()) || 
 
290
                        ((log_size < tr_file->getEOF()) && !overflow)){ 
 
291
                                        
 
292
                        char buffer[CS_EXC_MESSAGE_SIZE];               
 
293
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unexpected transaction log size: ");
 
294
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, path->getCString());
 
295
                        CSException::throwException(CS_CONTEXT, CS_ERR_BAD_FILE_HEADER, buffer);
 
296
                }
 
297
                
 
298
                trans->txn_MaxTID = CS_GET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4);
 
299
 
 
300
                // Looks good, we will assume it is a valid log file.
 
301
                trans->txn_TransCache = MSTransCache::newMSTransCache(CS_GET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4));
 
302
 
 
303
                pop_(tr_file);
 
304
                trans->txn_SetFile(tr_file);
 
305
 
 
306
                trans->txn_MaxCheckPoint = CS_GET_DISK_2(trans->txn_DiskHeader.th_check_point_2);
 
307
                
 
308
                trans->txn_MaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8);
 
309
                trans->txn_ReqestedMaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8);
 
310
                
 
311
                trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
 
312
                trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
 
313
                trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
 
314
                trans->txn_HaveOverflow = overflow;
 
315
                if (overflow) 
 
316
                        trans->txn_Overflow = (tr_file->getEOF() - sizeof(MSDiskTransHeadRec)) /sizeof(MSDiskTransRec); 
 
317
                else
 
318
                        trans->txn_Overflow = 0;
 
319
 
 
320
#ifdef DEBUG
 
321
                if (overflow)
 
322
                        printf("Recovering overflow log\n");
 
323
                if (dump_log) {
 
324
                        char name[100];
 
325
                        snprintf(name, 100, "%dms-trans-log.dump", time(NULL));
 
326
                        trans->txn_DumpLog(name);
 
327
                }
 
328
#endif
 
329
                // Recover the log if required.
 
330
                if (!recovered)
 
331
                        trans->txn_Recover();
 
332
                        
 
333
                        
 
334
        }
 
335
        
 
336
        trans->txn_Recovered = true; // Any recovery required has been completed.
 
337
 
 
338
        // The log has been recovered so these values should be valid:
 
339
        trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
 
340
        trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
 
341
        
 
342
        // Set the header to indicate that the log has not been closed properly. 
 
343
        // This is reset when the log is closed during shutdown.
 
344
        CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_NOT_RECOVERED);
 
345
        trans->txn_File->write(&(trans->txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1);
 
346
        
 
347
        // Load the transaction records into memory.
 
348
        trans->txn_TransCache->tc_StartCacheReload(true);
 
349
        trans->txn_LoadTransactionCache(trans->txn_Start);
 
350
        trans->txn_TransCache->tc_CompleteCacheReload();
 
351
        
 
352
        if (trans->txn_MaxRecords != trans->txn_ReqestedMaxRecords) 
 
353
                trans->txn_ResizeLog(); // Try to resize but it may not be possible yet.
 
354
        
 
355
        release_(path);
 
356
        pop_(trans);
 
357
 
 
358
#ifdef TRACE_ALL
 
359
        
 
360
        txn_debug_log = fopen("log_dump.txt", "w+");
 
361
        if (!txn_debug_log) {
 
362
                perror("log_dump.txt");
 
363
        }
 
364
#endif
 
365
        
 
366
        return_(trans);
 
367
}
 
368
 
 
369
bool MSTrans::txn_ValidRecord(MSTransPtr rec)
 
370
{
 
371
        uint8_t check = rec->tr_check;
 
372
        bool ok;
 
373
        
 
374
        rec->tr_check = txn_Checksum;
 
375
        ok = (checksum((uint8_t*)rec, sizeof(MSTransRec)) == check);
 
376
        rec->tr_check = check;
 
377
        return ok;
 
378
}
 
379
 
 
380
void MSTrans::txn_GetRecordAt(uint64_t index, MSTransPtr rec)
 
381
{
 
382
        MSDiskTransRec drec;
 
383
        off_t offset;
 
384
        
 
385
        // Read 1 record from the log and convert it from disk format.
 
386
        offset = sizeof(MSDiskTransHeadRec) + index * sizeof(MSDiskTransRec);           
 
387
        txn_File->read(&drec, offset, sizeof(MSDiskTransRec), sizeof(MSDiskTransRec));
 
388
        GET_DISK_TRANSREC(rec, &drec);
 
389
}
 
390
 
 
391
// Recovery involves finding the start of the first record and the eof
 
392
// position. The positions will be found at or after the position stored
 
393
// in the header. 
 
394
void MSTrans::txn_Recover()
 
395
{
 
396
        MSTransRec rec = {0};
 
397
        uint64_t original_eol = txn_EOL;
 
398
        enter_();
 
399
 
 
400
#ifdef DEBUG
 
401
printf("Recovering      transaction log!\n");
 
402
#endif
 
403
 
 
404
        txn_MaxTID = 0;
 
405
        // Search for the last valid record in the log starting from the last 
 
406
        // known position stored in the header.
 
407
        for (; txn_EOL < txn_MaxRecords; txn_EOL++) {
 
408
                txn_GetRecordAt(txn_EOL, &rec);
 
409
                if (! txn_ValidRecord(&rec))
 
410
                        break;  
 
411
        }
 
412
        
 
413
        if (txn_EOL == txn_MaxRecords) {
 
414
                // It looks like all the records in the log are valid?
 
415
                // This is strange but could happen if the crash
 
416
                // occurred just before updating the header as the
 
417
                // eol position rolled over to the top of the log.
 
418
                txn_EOL = 0;
 
419
        }
 
420
        
 
421
        txn_MaxTID++;
 
422
        
 
423
        CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
 
424
        
 
425
        // If the actual eol has moved pass the recorded start position
 
426
        // then the actuall start position must be some where beyond
 
427
        // the eol.
 
428
        if (((original_eol < txn_Start) || (original_eol > txn_EOL)) && (txn_EOL >= txn_Start)) 
 
429
                txn_Start = txn_EOL +1; 
 
430
 
 
431
        // Position the start at the beginning of a transaction.        
 
432
        uint64_t end_search = (txn_Start < txn_EOL)? txn_EOL : txn_MaxRecords;
 
433
        for (; txn_Start < end_search; txn_Start++) {
 
434
                txn_GetRecordAt(txn_Start, &rec);
 
435
                if (TRANS_IS_START(rec.tr_type))
 
436
                        break;  
 
437
        }
 
438
 
 
439
        if (txn_Start == end_search)
 
440
                txn_Start = txn_EOL; 
 
441
                
 
442
        CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
 
443
        
 
444
        txn_TransCache->tc_SetRecovering(true); 
 
445
        // Load the transaction records into the cache.
 
446
        txn_TransCache->tc_StartCacheReload(true);
 
447
        txn_LoadTransactionCache(txn_Start);
 
448
        txn_TransCache->tc_CompleteCacheReload();
 
449
        
 
450
        // Now go through all the transactions and add rollbacks for any
 
451
        // unterminated transactions.
 
452
        TRef ref;
 
453
        bool terminated;
 
454
        while  (txn_TransCache->tc_GetTransaction(&ref, &terminated)) {
 
455
                
 
456
                txn_MaxTID = txn_TransCache->tc_GetTransactionID(ref); // Save the TID of the last transaction.
 
457
                if (!terminated) {
 
458
                        self->myTID = txn_MaxTID;
 
459
                        self->myTransRef = ref;
 
460
                        self->myStartTxn = false;
 
461
                        txn_AddTransaction(MS_RecoveredTxn);
 
462
                }
 
463
CRASH_POINT(2);
 
464
                txn_TransCache->tc_FreeTransaction(ref);
 
465
                
 
466
                // Load the next block of transactions into the cache.
 
467
                // This needs to be done after each tc_GetTransaction() to make sure
 
468
                // that if the transaction terminator is some where in the log
 
469
                // it will get read even if the cache is completely full. 
 
470
                if (txn_TransCache->tc_ShoulReloadCache()) {
 
471
                        txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload(true));
 
472
                        txn_TransCache->tc_CompleteCacheReload();
 
473
                }
 
474
        }
 
475
        
 
476
        
 
477
        txn_TransCache->tc_SetRecovering(false); 
 
478
        self->myTransRef = 0;
 
479
        
 
480
        // Update the header again incase rollbacks have been added.
 
481
        CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
 
482
                                
 
483
        exit_();
 
484
}
 
485
 
 
486
bool ReadTXNLog::rl_CanContinue() 
 
487
 
488
        return rl_log->txn_TransCache->tc_ContinueCacheReload();
 
489
}
 
490
 
 
491
void ReadTXNLog::rl_Load(uint64_t log_position, MSTransPtr rec) 
 
492
{
 
493
        rl_log->txn_TransCache->tc_AddRec(log_position, rec);
 
494
}
 
495
 
 
496
void ReadTXNLog::rl_Store(uint64_t log_position, MSTransPtr rec) 
 
497
{
 
498
        MSDiskTransRec drec;
 
499
        SET_DISK_TRANSREC(&drec, rec);
 
500
 
 
501
        rl_log->txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + log_position * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec));
 
502
}
 
503
 
 
504
void ReadTXNLog::rl_Flush() 
 
505
{
 
506
        rl_log->txn_File->flush();
 
507
        rl_log->txn_File->sync();
 
508
}
 
509
 
 
510
void ReadTXNLog::rl_ReadLog(uint64_t read_start, bool log_locked)
 
511
{
 
512
        uint64_t        size, orig_size;
 
513
        bool reading_overflow = (read_start >= rl_log->txn_MaxRecords);
 
514
        enter_();
 
515
        
 
516
 
 
517
        // Get the number of transaction records to be loaded.
 
518
        if (reading_overflow) {
 
519
                orig_size = rl_log->txn_Overflow;
 
520
                size = rl_log->txn_Overflow - read_start;
 
521
        } else {
 
522
                orig_size = rl_log->txn_GetNumRecords();
 
523
        
 
524
                if (rl_log->txn_Start <= read_start)
 
525
                        size = orig_size - (read_start - rl_log->txn_Start);
 
526
                else 
 
527
                        size = rl_log->txn_EOL - read_start;
 
528
        }
 
529
        
 
530
        // load all the records
 
531
        while (size && rl_CanContinue()) {
 
532
                MSDiskTransRec diskRecords[1000];
 
533
                uint32_t read_size;
 
534
                off_t offset;
 
535
                
 
536
                if (size > 1000) 
 
537
                        read_size = 1000 ;
 
538
                else
 
539
                        read_size = size ;
 
540
                
 
541
                // Check if we have reached the wrap around point in the log.
 
542
                if ((!reading_overflow) && (rl_log->txn_EOL < read_start) && ((rl_log->txn_MaxRecords - read_start) < read_size))
 
543
                        read_size = rl_log->txn_MaxRecords - read_start ;
 
544
 
 
545
                // Read the next block of records.
 
546
                offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);              
 
547
                rl_log->txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
 
548
                
 
549
                // Convert the records from disk format and add them to the cache.
 
550
                for (uint32_t i = 0; i < read_size && rl_CanContinue(); i++) {
 
551
                        MSTransRec rec;
 
552
                        MSDiskTransPtr drec = diskRecords + i;
 
553
                        GET_DISK_TRANSREC(&rec, drec);
 
554
                        
 
555
                        rl_Load(read_start + i, &rec); 
 
556
                }
 
557
                
 
558
                size -= read_size;
 
559
                read_start += read_size;
 
560
                if (read_start == rl_log->txn_MaxRecords)
 
561
                        read_start = 0;
 
562
        }
 
563
        
 
564
        if (rl_log->txn_HaveOverflow && !reading_overflow) {
 
565
                if (rl_CanContinue()) 
 
566
                        rl_ReadLog(rl_log->txn_MaxRecords, false);
 
567
                
 
568
        } else if (!log_locked) {       
 
569
                // The following is intended to prevent the case where a writer 
 
570
                // writes an txn record while the cache is full but just after 
 
571
                // the reload has completed. If the cache is not yet full we need
 
572
                // to load as many of the new records into cache as possible.
 
573
        
 
574
                uint64_t        new_size;
 
575
                lock_(rl_log);
 
576
                if (reading_overflow)
 
577
                        new_size = rl_log->txn_Overflow;
 
578
                else 
 
579
                        new_size = rl_log->txn_GetNumRecords();
 
580
                if (rl_CanContinue() && (orig_size != new_size)) {
 
581
                        rl_ReadLog(read_start, true);
 
582
                }
 
583
                unlock_(rl_log);
 
584
        }
 
585
        
 
586
 
 
587
        exit_();
 
588
}
 
589
 
 
590
void MSTrans::txn_LoadTransactionCache(uint64_t read_start)
 
591
{
 
592
        ReadTXNLog log(this);
 
593
        enter_();
 
594
        log.rl_ReadLog(read_start, false);
 
595
        txn_TransCache->tc_UpdateCacheVersion(); // Signal writes to recheck cache for overflow txn refs.
 
596
        exit_();
 
597
}
 
598
 
 
599
void  MSTrans::txn_ResizeLog()
 
600
{
 
601
        enter_();
 
602
        
 
603
        lock_(this);
 
604
        if (TRANS_CAN_RESIZE) {
 
605
                // TRANS_CAN_RESIZE checks that there is no overflow and the the start position 
 
606
                // is less than eol. This implies the from eol to the end of file doesn't contain
 
607
                // and used records.
 
608
                
 
609
 
 
610
#ifdef DEBUG    
 
611
                uint64_t old_size = txn_MaxRecords;
 
612
#endif          
 
613
                if (txn_MaxRecords > txn_ReqestedMaxRecords) { // Shrink the log
 
614
                        uint64_t max_resize = txn_MaxRecords - txn_EOL;
 
615
                        
 
616
                        if ( txn_Start == txn_EOL)
 
617
                                max_resize = txn_MaxRecords;
 
618
                        else {
 
619
                                max_resize = txn_MaxRecords - txn_EOL;
 
620
                                if (!txn_Start) // If start is at '0' then the EOL cannot be wrapped.
 
621
                                        max_resize--;
 
622
                        }
 
623
                        
 
624
                                
 
625
                        if (max_resize > (txn_MaxRecords - txn_ReqestedMaxRecords))
 
626
                                max_resize = txn_MaxRecords - txn_ReqestedMaxRecords;
 
627
                                                        
 
628
                        txn_MaxRecords -=       max_resize;
 
629
                } else
 
630
                        txn_MaxRecords = txn_ReqestedMaxRecords; // Grow the log
 
631
 
 
632
#ifdef DEBUG                    
 
633
                char buffer[CS_EXC_MESSAGE_SIZE];               
 
634
                snprintf(buffer, CS_EXC_MESSAGE_SIZE, "Resizing the Transaction log from %"PRIu64" to %"PRIu64" \n",  old_size, txn_MaxRecords);
 
635
                CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, buffer);
 
636
#endif
 
637
 
 
638
                CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
 
639
                
 
640
                txn_File->setEOF(txn_MaxRecords * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec));
 
641
                txn_File->write(&(txn_DiskHeader.th_list_size_8), offsetof(MSDiskTransHeadRec, th_list_size_8), 8);
 
642
                
 
643
                if (txn_Start == txn_EOL) {
 
644
                        txn_Start = 0;
 
645
                        txn_EOL = 0;
 
646
                } else if (txn_MaxRecords == txn_EOL) {
 
647
                        txn_EOL = 0;
 
648
                }
 
649
                
 
650
                txn_ResetEOL();
 
651
                                
 
652
        }       
 
653
        unlock_(this);
 
654
        
 
655
        exit_();
 
656
}
 
657
 
 
658
void  MSTrans::txn_ResetEOL()
 
659
{
 
660
        enter_();
 
661
        
 
662
        txn_EOLCheckPoint = txn_MaxCheckPoint;
 
663
        txn_StartCheckPoint = txn_MaxCheckPoint;
 
664
        
 
665
        if (!txn_EOL)
 
666
                txn_Checksum++;
 
667
        CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
 
668
        CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
 
669
        CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
 
670
        txn_File->write(&(txn_DiskHeader.th_start_8), 
 
671
                                                offsetof(MSDiskTransHeadRec, th_start_8), 
 
672
                                                sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
 
673
CRASH_POINT(5);
 
674
        txn_File->flush();
 
675
        txn_File->sync();
 
676
CRASH_POINT(10);
 
677
                
 
678
        exit_();
 
679
}
 
680
 
 
681
void MSTrans::txn_LogTransaction(MS_Txn type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
 
682
{
 
683
        enter_();
 
684
        
 
685
        lock_(this);
 
686
        if (!self->myTID) {
 
687
                txn_MaxTID++;
 
688
                self->myTID = txn_MaxTID;
 
689
                self->myTransRef = TRANS_CACHE_NEW_REF;
 
690
                self->myStartTxn = true;
 
691
        }
 
692
 
 
693
        txn_AddTransaction(type, autocommit, db_id, tab_id, blob_id, blob_ref_id);
 
694
        if (autocommit || TRANS_TYPE_IS_TERMINATED(type))
 
695
                txn_NewTransaction();
 
696
                
 
697
        unlock_(this);
 
698
        
 
699
        exit_();
 
700
}
 
701
 
 
702
void  MSTrans::txn_AddTransaction(uint8_t tran_type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
 
703
{
 
704
        MSTransRec rec = {0}; // This must be set to zero so that the checksum will be valid. 
 
705
        MSDiskTransRec drec;
 
706
        uint64_t new_offset = txn_EOL;
 
707
        bool do_flush = true;
 
708
 
 
709
        enter_();
 
710
 
 
711
        
 
712
        // Check that the log is not already full.
 
713
        if (txn_IsFull()) {             
 
714
                if (!txn_HaveOverflow) { // The first overflow record: update the header.
 
715
                        CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_OVERFLOW);
 
716
                        txn_File->write(&(txn_DiskHeader.th_overflow_1), offsetof(MSDiskTransHeadRec, th_overflow_1), 1);
 
717
 
 
718
                        CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
 
719
                        CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
 
720
                        txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
 
721
 
 
722
                        txn_File->flush();
 
723
                        txn_File->sync();
 
724
                        txn_HaveOverflow = true;
 
725
                        txn_OverflowCount++;
 
726
                        txn_Overflow =  txn_MaxRecords;         
 
727
                }
 
728
                
 
729
                new_offset = txn_Overflow;
 
730
        }
 
731
                
 
732
        rec.tr_id = self->myTID ;
 
733
        rec.tr_type = tran_type;
 
734
        rec.tr_db_id = db_id;
 
735
        rec.tr_tab_id = tab_id;
 
736
        rec.tr_blob_id = blob_id;
 
737
        rec.tr_blob_ref_id = blob_ref_id;
 
738
        
 
739
        if (self->myStartTxn) {
 
740
                TRANS_SET_START(rec.tr_type);
 
741
                self->myStartTxn = false;
 
742
        }
 
743
                
 
744
        if (autocommit) {
 
745
                TRANS_SET_AUTOCOMMIT(rec.tr_type);
 
746
        }
 
747
                
 
748
#ifdef TRACE_ALL
 
749
if (txn_debug_log){
 
750
char *ttype, *cmt;
 
751
switch (TRANS_TYPE(rec.tr_type)) {
 
752
        case MS_ReferenceTxn:
 
753
                ttype = "+";
 
754
                break;
 
755
        case MS_DereferenceTxn:
 
756
                ttype = "-";
 
757
                break;
 
758
        case MS_RollBackTxn:
 
759
                ttype = "rb";
 
760
                rec.tr_blob_ref_id = 0;
 
761
                break;
 
762
        case MS_RecoveredTxn:
 
763
                ttype = "rcov";
 
764
                rec.tr_blob_ref_id = 0;
 
765
                break;
 
766
        default:
 
767
                ttype = "???";
 
768
}
 
769
 
 
770
if (TRANS_IS_TERMINATED(rec.tr_type))
 
771
        cmt = "c";
 
772
else
 
773
        cmt = "";
 
774
                        
 
775
fprintf(txn_debug_log, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" %"PRIu64" %"PRIu64"  %"PRIu64" %d\n", self->myTID, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id, txn_Start, txn_EOL, new_offset, txn_HaveOverflow);
 
776
}
 
777
#endif
 
778
 
 
779
        rec.tr_check = txn_Checksum; 
 
780
        
 
781
        // Calculate the records checksum.
 
782
        rec.tr_check = checksum((uint8_t*)&rec, sizeof(rec));
 
783
        
 
784
        // Write the record to disk.
 
785
        SET_DISK_TRANSREC(&drec, &rec);
 
786
#ifdef CRASH_TEST
 
787
 
 
788
        if (trans_test_crash_point == 9) { // do a partial write before crashing
 
789
                txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec)/2 );
 
790
                CRASH_POINT(9);
 
791
        } else
 
792
                txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
 
793
#else   
 
794
        txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
 
795
#endif
 
796
CRASH_POINT(3);
 
797
        // There is no need to sync if the transaction is still running.
 
798
        if (TRANS_IS_TERMINATED(tran_type)) {
 
799
                CRASH_POINT(4); // This crash will result in a verify error because the txn was committed to the log but not the database.
 
800
                txn_File->flush();
 
801
                txn_File->sync();
 
802
                do_flush = false;
 
803
        }
 
804
        
 
805
        if (!txn_HaveOverflow) { // No need to update the header if overflowing.
 
806
                uint64_t rec_offset = txn_EOL;
 
807
                
 
808
                txn_EOL = new_offset;
 
809
                txn_EOL++;
 
810
                
 
811
                if (txn_EOL == txn_MaxRecords) {
 
812
                        // The eol has rolled over.
 
813
                        txn_EOL = 0;            
 
814
                }
 
815
 
 
816
                txn_EOLCheckPoint--;
 
817
                if ((!txn_EOLCheckPoint) || !txn_EOL) {
 
818
                        
 
819
                        // Flush the previouse write if required before updating the header.
 
820
                        // This is just in case it crashes during the sync to make sure that the
 
821
                        // header information is correct for the data on disk. If the crash occurred
 
822
                        // between writing the header and the record the header on disk would be wrong.
 
823
                        if (do_flush) {
 
824
                                txn_File->flush();
 
825
                                txn_File->sync();
 
826
                        }
 
827
                        
 
828
                        txn_ResetEOL();
 
829
                }
 
830
                
 
831
                txn_TransCache->tc_AddRec(rec_offset, &rec, self->myTransRef);
 
832
                
 
833
                if (txn_GetNumRecords() > txn_HighWaterMark)
 
834
                        txn_HighWaterMark = txn_GetNumRecords();
 
835
                        
 
836
        } else { // Ovewrflow
 
837
                txn_TransCache->tc_AddRec(txn_Overflow, &rec, self->myTransRef);
 
838
                txn_Overflow++;
 
839
                if (txn_Overflow > txn_HighWaterMark)
 
840
                        txn_HighWaterMark = txn_Overflow;
 
841
        }
 
842
        
 
843
        ASSERT(txn_EOL < txn_MaxRecords);
 
844
        ASSERT(txn_Start < txn_MaxRecords);
 
845
        exit_();
 
846
}
 
847
 
 
848
uint64_t MSTrans::txn_GetSize()         
 
849
{
 
850
        return sizeof(MSDiskTransHeadRec) + txn_MaxRecords * sizeof(MSDiskTransRec);
 
851
}
 
852
 
 
853
//---------------
 
854
void MSTrans::txn_NewTransaction()
 
855
 
856
        enter_();
 
857
 
 
858
        self->myTID = 0;        // This will be assigned when the first record is written.
 
859
        
 
860
        exit_();
 
861
}
 
862
 
 
863
//---------------
 
864
void MSTrans::txn_PerformIdleTasks()
 
865
{
 
866
        enter_();
 
867
        
 
868
        if (txn_TransCache->tc_ShoulReloadCache()) {
 
869
                txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload());
 
870
                txn_TransCache->tc_CompleteCacheReload();
 
871
                exit_();
 
872
        }
 
873
        
 
874
        // During backup the reader is suspended. This may need to be changed
 
875
        // if we decide to actually do something here.
 
876
        txn_reader->suspendedWait(1000);
 
877
        exit_();
 
878
}
 
879
 
 
880
//---------------
 
881
void MSTrans::txn_ResetReadPosition(uint64_t pos)
 
882
{       
 
883
        bool rollover = (pos < txn_Start);
 
884
        enter_();
 
885
        
 
886
        if (pos >= txn_MaxRecords) { // Start of overflow
 
887
                lock_(this);
 
888
                
 
889
                // Overflow has occurred and the circular list is now empty 
 
890
                // so expand the list to include the overflow and 
 
891
                // reset txn_Start and txn_EOL
 
892
                txn_Start = txn_MaxRecords;
 
893
                txn_MaxRecords = txn_Overflow;
 
894
                txn_EOL = 0;
 
895
                txn_HaveOverflow = false;
 
896
                txn_Overflow = 0;
 
897
                
 
898
                CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
 
899
                CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
 
900
                txn_File->write(&(txn_DiskHeader.th_overflow_1),        offsetof(MSDiskTransHeadRec, th_overflow_1),    1);
 
901
                txn_File->write(&(txn_DiskHeader.th_list_size_8),       offsetof(MSDiskTransHeadRec, th_list_size_8),   8);
 
902
                                
 
903
                txn_ResetEOL();
 
904
                
 
905
                unlock_(this);
 
906
        } else
 
907
                txn_Start = pos;        
 
908
        
 
909
        ASSERT(txn_Start <= txn_MaxRecords);
 
910
        
 
911
        if (!rollover)
 
912
                txn_StartCheckPoint -= (pos - txn_Start);
 
913
        
 
914
        // Flush the header if the read position has rolled over or it is time. 
 
915
        if ( rollover || (txn_StartCheckPoint <=0)) {
 
916
                lock_(this);
 
917
                CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
 
918
                CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
 
919
                txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
 
920
CRASH_POINT(5);
 
921
                txn_File->flush();
 
922
                txn_File->sync();
 
923
                txn_StartCheckPoint = txn_MaxCheckPoint;
 
924
                unlock_(this);
 
925
        }
 
926
        
 
927
CRASH_POINT(6);
 
928
        
 
929
        if (TRANS_CAN_RESIZE) 
 
930
                txn_ResizeLog();
 
931
                
 
932
        exit_();
 
933
}
 
934
//---------------
 
935
bool MSTrans::txn_haveNextTransaction() 
 
936
{
 
937
        bool terminated = false;
 
938
        TRef ref;
 
939
        
 
940
        txn_TransCache->tc_GetTransaction(&ref, &terminated);
 
941
        
 
942
        return terminated;
 
943
}
 
944
 
 
945
//---------------
 
946
void MSTrans::txn_GetNextTransaction(MSTransPtr tran, MS_TxnState *state)
 
947
{
 
948
        bool terminated;
 
949
        uint64_t log_position;
 
950
        enter_();
 
951
        
 
952
        ASSERT(txn_reader == self);
 
953
        lock_(txn_reader);
 
954
        
 
955
        do {
 
956
                // Get the next completed transaction.
 
957
                // this will suspend the current thread, which is assumed
 
958
                // to be the log reader, until one is available.
 
959
                while ((!txn_IsTxnValid) && !self->myMustQuit) {
 
960
 
 
961
                        // wait until backup has completed.
 
962
                        while (txn_Doingbackup && !self->myMustQuit)
 
963
                                txn_PerformIdleTasks();
 
964
        
 
965
                        if (txn_TransCache->tc_GetTransaction(&txn_CurrentTxn, &terminated) && terminated) {
 
966
                                txn_IsTxnValid = true;
 
967
                                txn_TxnIndex = 0;
 
968
                        } else
 
969
                                txn_PerformIdleTasks();
 
970
                }
 
971
                
 
972
                if (self->myMustQuit)
 
973
                        exit_();
 
974
                        
 
975
                if (txn_TransCache->tc_GetRecAt(txn_CurrentTxn, txn_TxnIndex++, tran, state)) 
 
976
                        break;
 
977
                        
 
978
CRASH_POINT(7);
 
979
                txn_TransCache->tc_FreeTransaction(txn_CurrentTxn);
 
980
CRASH_POINT(8);
 
981
                if (txn_TransCache->tc_GetTransactionStartPosition(&log_position)) {
 
982
                        txn_ResetReadPosition(log_position);
 
983
                }else{
 
984
                        if (txn_TransCache->tc_ShoulReloadCache()) {
 
985
                                uint64_t pos = txn_TransCache->tc_StartCacheReload();
 
986
                                txn_ResetReadPosition(pos);
 
987
                                txn_LoadTransactionCache(pos);
 
988
                                txn_TransCache->tc_CompleteCacheReload();
 
989
                        } else {
 
990
                                // Lock the object to prevent writer thread updates while I check again.
 
991
                                // This is to ensure that txn_EOL is not changed between the call to
 
992
                                // tc_GetTransactionStartPosition() and setting the read position.
 
993
                                lock_(this);
 
994
                                if (txn_TransCache->tc_GetTransactionStartPosition(&log_position)) 
 
995
                                        txn_ResetReadPosition(log_position);
 
996
                                else
 
997
                                        txn_ResetReadPosition(txn_EOL);
 
998
                                unlock_(this);
 
999
                        }
 
1000
                }
 
1001
                
 
1002
                txn_IsTxnValid = false;
 
1003
                        
 
1004
        } while (1);
 
1005
        
 
1006
        unlock_(txn_reader);
 
1007
        exit_();
 
1008
}
 
1009
 
 
1010
 
 
1011
void MSTrans::txn_GetStats(MSTransStatsPtr stats)
 
1012
{
 
1013
        
 
1014
        if (txn_HaveOverflow) {
 
1015
                stats->ts_IsOverflowing = true;
 
1016
                stats->ts_LogSize = txn_Overflow;
 
1017
        } else {
 
1018
                stats->ts_IsOverflowing = false;
 
1019
                stats->ts_LogSize = txn_GetNumRecords();
 
1020
        }
 
1021
        stats->ts_PercentFull = (stats->ts_LogSize * 100) / CS_GET_DISK_8(txn_DiskHeader.th_requested_list_size_8);
 
1022
 
 
1023
        stats->ts_MaxSize = txn_HighWaterMark;
 
1024
        stats->ts_OverflowCount = txn_OverflowCount;
 
1025
        
 
1026
        stats->ts_TransCacheSize = txn_TransCache->tc_GetCacheUsed();
 
1027
        stats->ts_PercentTransCacheUsed = txn_TransCache->tc_GetPercentCacheUsed();
 
1028
        stats->ts_PercentCacheHit = txn_TransCache->tc_GetPercentCacheHit();
 
1029
}
 
1030
 
 
1031
void MSTrans::txn_SetCacheSize(uint32_t new_size)
 
1032
{
 
1033
        enter_();
 
1034
        // Important lock order. Writer threads never lock the reader but the reader
 
1035
        // may lock this object so always lock the reader first.
 
1036
        lock_(txn_reader);
 
1037
        lock_(this);
 
1038
 
 
1039
        CS_SET_DISK_4(txn_DiskHeader.th_requested_cache_size_4, new_size);
 
1040
        
 
1041
        txn_File->write(&(txn_DiskHeader.th_requested_cache_size_4), offsetof(MSDiskTransHeadRec, th_requested_cache_size_4), 4);
 
1042
        txn_File->flush();
 
1043
        txn_File->sync();
 
1044
 
 
1045
        txn_TransCache->tc_SetSize(new_size);
 
1046
 
 
1047
        unlock_(this);
 
1048
        unlock_(txn_reader);
 
1049
        exit_();
 
1050
}
 
1051
 
 
1052
void MSTrans::txn_SetLogSize(uint64_t new_size)
 
1053
{
 
1054
        enter_();
 
1055
        
 
1056
        // Important lock order. Writer threads never lock the reader but the reader
 
1057
        // may lock this object so always lock the reader first.
 
1058
        lock_(txn_reader);
 
1059
        lock_(this);
 
1060
        
 
1061
        txn_ReqestedMaxRecords = (new_size - sizeof(MSDiskTransHeadRec)) / sizeof(MSDiskTransRec);
 
1062
        
 
1063
        if (txn_ReqestedMaxRecords < 10)
 
1064
                txn_ReqestedMaxRecords = 10;
 
1065
        
 
1066
        CS_SET_DISK_8(txn_DiskHeader.th_requested_list_size_8, txn_ReqestedMaxRecords);
 
1067
        
 
1068
        txn_File->write(&(txn_DiskHeader.th_requested_list_size_8), offsetof(MSDiskTransHeadRec, th_requested_list_size_8), 8);
 
1069
        txn_File->flush();
 
1070
        txn_File->sync();
 
1071
        
 
1072
        unlock_(this);
 
1073
        unlock_(txn_reader);
 
1074
        
 
1075
        exit_();
 
1076
}
 
1077
 
 
1078
// A helper class for resetting database IDs in the transaction log.
 
1079
class DBSearchTXNLog : ReadTXNLog {
 
1080
        public:
 
1081
        DBSearchTXNLog(MSTrans *log): ReadTXNLog(log), sdb_db_id(0), sdb_isDirty(false) {}
 
1082
        
 
1083
        uint32_t sdb_db_id; 
 
1084
        bool sdb_isDirty;
 
1085
        
 
1086
        virtual bool rl_CanContinue() { return true;}
 
1087
        virtual void rl_Load(uint64_t log_position, MSTransPtr rec) 
 
1088
        {
 
1089
                if  (rec->tr_db_id == sdb_db_id) {
 
1090
                        sdb_isDirty = true;
 
1091
                        rec->tr_db_id = 0;
 
1092
                        rl_Store(log_position, rec);
 
1093
                } 
 
1094
        }
 
1095
        
 
1096
        void SetDataBaseIDToZero(uint32_t db_id)
 
1097
        {
 
1098
                sdb_db_id = db_id;
 
1099
                rl_ReadLog(rl_log->txn_GetStartPosition(), false);
 
1100
                if (sdb_isDirty)
 
1101
                        rl_Flush();
 
1102
        }
 
1103
};
 
1104
 
 
1105
// Dropping the database from the transaction log just involves
 
1106
// scanning the log and setting the database id of any transactions 
 
1107
// involving the dropped database to zero.
 
1108
void MSTrans::txn_dropDatabase(uint32_t db_id)
 
1109
{
 
1110
        enter_();
 
1111
        
 
1112
        // Important lock order. Writer threads never lock the reader but the reader
 
1113
        // may lock this object so always lock the reader first.
 
1114
        lock_(txn_reader);
 
1115
        lock_(this);
 
1116
        
 
1117
        // Clear any transaction records in the cache for the dropped database;
 
1118
        txn_TransCache->tc_dropDatabase(db_id);
 
1119
        
 
1120
        // Scan the log setting the database ID for any record belonging to the
 
1121
        // dropped database to zero. 
 
1122
        DBSearchTXNLog searchLog(this);
 
1123
        
 
1124
        searchLog.SetDataBaseIDToZero(db_id);
 
1125
                
 
1126
        unlock_(this);
 
1127
        unlock_(txn_reader);
 
1128
        exit_();
 
1129
}
 
1130
 
 
1131
#ifdef DEBUG    
 
1132
void MSTrans::txn_DumpLog(const char *file)
 
1133
{
 
1134
        size_t  size, read_start = 0;
 
1135
        FILE *fptr;
 
1136
        enter_();
 
1137
        
 
1138
        fptr = fopen(file, "w+");
 
1139
        if (!fptr) {
 
1140
                perror(file);
 
1141
                return;
 
1142
        }
 
1143
        
 
1144
        if (txn_Overflow)
 
1145
                size = txn_Overflow;
 
1146
        else
 
1147
                size = txn_MaxRecords;
 
1148
        
 
1149
        // Dump all the records
 
1150
        while (size) {
 
1151
                MSDiskTransRec diskRecords[1000];
 
1152
                uint32_t read_size;
 
1153
                off_t offset;
 
1154
                
 
1155
                if (size > 1000) 
 
1156
                        read_size = 1000 ;
 
1157
                else
 
1158
                        read_size = size ;
 
1159
                
 
1160
                // Read the next block of records.
 
1161
                offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);              
 
1162
                txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
 
1163
                
 
1164
                for (uint32_t i = 0; i < read_size; i++) {
 
1165
                        const char *ttype, *cmt;
 
1166
                        MSTransRec rec;
 
1167
                        MSDiskTransPtr drec = diskRecords + i;
 
1168
                        GET_DISK_TRANSREC(&rec, drec);
 
1169
                        
 
1170
                        switch (TRANS_TYPE(rec.tr_type)) {
 
1171
                                case MS_ReferenceTxn:
 
1172
                                        ttype = "+";
 
1173
                                        break;
 
1174
                                case MS_DereferenceTxn:
 
1175
                                        ttype = "-";
 
1176
                                        break;
 
1177
                                case MS_RollBackTxn:
 
1178
                                        ttype = "rb";
 
1179
                                        rec.tr_blob_ref_id = 0;
 
1180
                                        break;
 
1181
                                case MS_RecoveredTxn:
 
1182
                                        ttype = "rcov";
 
1183
                                        rec.tr_blob_ref_id = 0;
 
1184
                                        break;
 
1185
                                default:
 
1186
                                        ttype = "???";
 
1187
                        }
 
1188
                        
 
1189
                        if (TRANS_IS_TERMINATED(rec.tr_type))
 
1190
                                cmt = "c";
 
1191
                        else
 
1192
                                cmt = "";
 
1193
                        
 
1194
                        
 
1195
                        fprintf(fptr, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" \t %s %s %s\n", rec.tr_id, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id, 
 
1196
                                ((read_start + i) == txn_Start) ? "START":"",
 
1197
                                ((read_start + i) == txn_EOL) ? "EOL":"",
 
1198
                                ((read_start + i) == txn_MaxRecords) ? "OverFlow":""
 
1199
                                );
 
1200
                }
 
1201
                
 
1202
                size -= read_size;
 
1203
                read_start += read_size;
 
1204
        }
 
1205
        fclose(fptr);
 
1206
        exit_();
 
1207
}       
 
1208
 
 
1209
#endif
 
1210