~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/trans_log_ms.cc

  • Committer: Monty Taylor
  • Date: 2010-07-06 00:44:32 UTC
  • mfrom: (1643.1.13 build)
  • Revision ID: mordred@inaugust.com-20100706004432-843uftc92rc2497l
Merged in PBMS, translation updates, a few build fixes and a few bug fixes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Barry Leslie
 
20
 *
 
21
 * 2009-06-09
 
22
 *
 
23
 * H&G2JCtL
 
24
 *
 
25
 * PBMS transaction handling.
 
26
 *
 
27
 * PBMS uses 1 circular transaction log. All BLOB reference operations are written to this log
 
28
 * and are applied to the repository when committed. There is 1 thread dedicated to reading the 
 
29
 * transaction log and applying the changes. During an engine level backup this thread is suspended 
 
30
 * so that no transactions will be applied to the repository files as they are backed up.
 
31
 *
 
32
 */
 
33
#include "cslib/CSConfig.h"
 
34
 
 
35
#include <stdlib.h>
 
36
#include <inttypes.h>
 
37
 
 
38
#include "cslib/CSGlobal.h"
 
39
#include "cslib/CSStrUtil.h"
 
40
#include "cslib/CSStorage.h"
 
41
 
 
42
#include "trans_log_ms.h"
 
43
#include "trans_cache_ms.h"
 
44
 
 
45
#ifdef CRASH_TEST
 
46
uint32_t        trans_test_crash_point;
 
47
#define CRASH_POINT(p) { if (p == trans_test_crash_point) { char *ptr = NULL; printf("Crash on demand at: %s(%d), start: %"PRIu64", eol: %"PRIu64"\n", __FILE__, __LINE__, txn_Start, txn_EOL); *ptr = 88;}}
 
48
#else
 
49
#define CRASH_POINT(p)
 
50
#endif
 
51
 
 
52
#define MS_TRANS_LOG_MAGIC                      0xA6E7D7B3
 
53
#define MS_TRANS_LOG_VERSION            1
 
54
#define MS_TRANS_LOG_RECOVERED          0XA1
 
55
#define MS_TRANS_LOG_NOT_RECOVERED      0XA2
 
56
#define MS_TRANS_NO_OVERFLOW            0XB1
 
57
#define MS_TRANS_OVERFLOW                       0XB2
 
58
 
 
59
#define DFLT_TRANS_CHECKPOINT_THRESHOLD 1024
 
60
 
 
61
#define DFLT_TRANS_LOG_LIST_SIZE        (1024 * 10)
 
62
#define DFLT_TRANS_CACHE_SIZE           (500)
 
63
 
 
64
#define TRANS_CAN_RESIZE        ((txn_MaxRecords != txn_ReqestedMaxRecords) && (txn_EOL >= txn_Start) && !txn_HaveOverflow)
 
65
 
 
66
typedef struct MSDiskTrans {
 
67
        CSDiskValue4    dtr_id_4;                       // The transaction ID
 
68
        CSDiskValue1    dtr_type_1;                     // The transaction type. If the first bit is set then the transaction is an autocommit.
 
69
        CSDiskValue1    dtr_check_1;            // The trransaction record checksum.
 
70
        CSDiskValue4    dtr_db_id_4;            // The database ID for the operation.
 
71
        CSDiskValue4    dtr_tab_id_4;           // The table ID for the operation.
 
72
        CSDiskValue8    dtr_blob_id_8;          // The blob ID for the operation.
 
73
        CSDiskValue8    dtr_blob_ref_id_8;      // The blob reference id.
 
74
} MSDiskTransRec, *MSDiskTransPtr;
 
75
 
 
76
#define SET_DISK_TRANSREC(d, s) { \
 
77
        CS_SET_DISK_4((d)->dtr_id_4, (s)->tr_id);\
 
78
        CS_SET_DISK_1((d)->dtr_type_1, (s)->tr_type);\
 
79
        CS_SET_DISK_1((d)->dtr_check_1, (s)->tr_check);\
 
80
        CS_SET_DISK_4((d)->dtr_db_id_4, (s)->tr_db_id);\
 
81
        CS_SET_DISK_4((d)->dtr_tab_id_4, (s)->tr_tab_id);\
 
82
        CS_SET_DISK_8((d)->dtr_blob_id_8, (s)->tr_blob_id);\
 
83
        CS_SET_DISK_8((d)->dtr_blob_ref_id_8, (s)->tr_blob_ref_id);\
 
84
}
 
85
 
 
86
#define GET_DISK_TRANSREC(s, d) { \
 
87
        (s)->tr_id = CS_GET_DISK_4((d)->dtr_id_4);\
 
88
        (s)->tr_type = CS_GET_DISK_1((d)->dtr_type_1);\
 
89
        (s)->tr_check = CS_GET_DISK_1((d)->dtr_check_1);\
 
90
        (s)->tr_db_id = CS_GET_DISK_4((d)->dtr_db_id_4);\
 
91
        (s)->tr_tab_id = CS_GET_DISK_4((d)->dtr_tab_id_4);\
 
92
        (s)->tr_blob_id = CS_GET_DISK_8((d)->dtr_blob_id_8);\
 
93
        (s)->tr_blob_ref_id = CS_GET_DISK_8((d)->dtr_blob_ref_id_8);\
 
94
}
 
95
 
 
96
static uint8_t checksum(uint8_t *data, size_t len)
 
97
{
 
98
        register uint32_t       sum = 0, g;
 
99
        uint8_t                         *chk;
 
100
 
 
101
        chk = data + len - 1;
 
102
        while (chk > data) {
 
103
                sum = (sum << 4) + *chk;
 
104
                if ((g = sum & 0xF0000000)) {
 
105
                        sum = sum ^ (g >> 24);
 
106
                        sum = sum ^ g;
 
107
                }
 
108
                chk--;
 
109
        }
 
110
        return (uint8_t) (sum ^ (sum >> 24) ^ (sum >> 16) ^ (sum >> 8));
 
111
}
 
112
 
 
113
MSTrans::MSTrans() : 
 
114
        CSSharedRefObject(),
 
115
        txn_MaxCheckPoint(0),
 
116
        txn_Doingbackup(false),
 
117
        txn_reader(NULL),
 
118
        txn_IsTxnValid(false),
 
119
        txn_CurrentTxn(0),
 
120
        txn_TxnIndex(0),
 
121
        txn_StartCheckPoint(0),
 
122
        txn_TransCache(NULL),
 
123
        txn_BlockingTransaction(0),
 
124
        txn_File(NULL),
 
125
        txn_EOLCheckPoint(0),
 
126
        txn_MaxRecords(0),
 
127
        txn_ReqestedMaxRecords(0),
 
128
        txn_HighWaterMark(0),
 
129
        txn_OverflowCount(0),
 
130
        txn_MaxTID(0),
 
131
        txn_Recovered(false),
 
132
        txn_HaveOverflow(false),
 
133
        txn_Overflow(0),
 
134
        txn_EOL(0),
 
135
        txn_Start(0),
 
136
        txn_Checksum(0)
 
137
{
 
138
}
 
139
 
 
140
MSTrans::~MSTrans()
 
141
{
 
142
        txn_Close();
 
143
        if (txn_TransCache) 
 
144
                txn_TransCache->release();
 
145
                
 
146
}       
 
147
 
 
148
void MSTrans::txn_Close()
 
149
{
 
150
        
 
151
        if (txn_File) {         
 
152
                // Set the header to indicate that the log has not been closed properly. 
 
153
                CS_SET_DISK_4(txn_DiskHeader.th_next_txn_id_4, txn_MaxTID);
 
154
                txn_File->write(&(txn_DiskHeader.th_next_txn_id_4), offsetof(MSDiskTransHeadRec, th_next_txn_id_4), 4 );
 
155
 
 
156
                CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
 
157
                CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
 
158
                CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
 
159
                txn_File->write(&(txn_DiskHeader.th_start_8), 
 
160
                                                        offsetof(MSDiskTransHeadRec, th_start_8), 
 
161
                                                        sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
 
162
CRASH_POINT(1);
 
163
                txn_File->flush();
 
164
                txn_File->sync();
 
165
 
 
166
                if (txn_Recovered) {
 
167
                        // Write the recovered flag seperately just incase of a crash during the write operation.
 
168
                        CS_SET_DISK_1(txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);           
 
169
                        txn_File->write(&(txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1 );
 
170
                        txn_File->flush();
 
171
                        txn_File->sync();
 
172
                }
 
173
                
 
174
                txn_File->close();
 
175
                txn_File->release();
 
176
                txn_File = NULL;                
 
177
        }
 
178
}
 
179
void MSTrans::txn_SetFile(CSFile *tr_file)
 
180
{
 
181
        txn_File = tr_file;
 
182
}
 
183
 
 
184
//#define TRACE_ALL
 
185
#ifdef TRACE_ALL
 
186
static FILE *txn_debug_log;
 
187
#endif
 
188
 
 
189
MSTrans *MSTrans::txn_NewMSTrans(const char *log_path, bool dump_log)
 
190
{
 
191
        MSTrans *trans = NULL;
 
192
        CSPath *path = NULL;
 
193
        uint64_t log_size;
 
194
        enter_();
 
195
 
 
196
        (void) dump_log;
 
197
        
 
198
        new_(trans, MSTrans());
 
199
        push_(trans);
 
200
 
 
201
        path = CSPath::newPath(log_path);
 
202
        push_(path);    
 
203
                
 
204
try_again:
 
205
        
 
206
        
 
207
        if (!path->exists()) { // Create the transaction log.   
 
208
                CSFile *tr_file = path->createFile(CSFile::CREATE);
 
209
                push_(tr_file);
 
210
                
 
211
                log_size = DFLT_TRANS_LOG_LIST_SIZE * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
 
212
 
 
213
                // Preallocate the log space and initialize it.
 
214
                MSDiskTransRec recs[1024] = {};
 
215
                off64_t offset = sizeof(MSDiskTransHeadRec);
 
216
                uint64_t num_records = DFLT_TRANS_LOG_LIST_SIZE;
 
217
                size_t size;
 
218
                
 
219
                while (num_records) {
 
220
                        if (num_records < 1024)
 
221
                                size = num_records;
 
222
                        else
 
223
                                size = 1024;
 
224
                        tr_file->write(recs, offset, size * sizeof(MSDiskTransRec));
 
225
                        offset += size * sizeof(MSDiskTransRec);
 
226
                        num_records -= size;
 
227
                }
 
228
 
 
229
                trans->txn_MaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
 
230
                trans->txn_ReqestedMaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
 
231
                trans->txn_MaxCheckPoint = DFLT_TRANS_CHECKPOINT_THRESHOLD;
 
232
                trans->txn_MaxTID = 1;
 
233
 
 
234
                // Initialize the log header.
 
235
                CS_SET_DISK_4(trans->txn_DiskHeader.th_magic_4, MS_TRANS_LOG_MAGIC);
 
236
                CS_SET_DISK_2(trans->txn_DiskHeader.th_version_2, MS_TRANS_LOG_VERSION);
 
237
                
 
238
                CS_SET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4, trans->txn_MaxTID);
 
239
 
 
240
                CS_SET_DISK_2(trans->txn_DiskHeader.th_check_point_2, trans->txn_MaxCheckPoint);
 
241
 
 
242
                CS_SET_DISK_8(trans->txn_DiskHeader.th_list_size_8, trans->txn_MaxRecords);
 
243
                CS_SET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8, trans->txn_ReqestedMaxRecords);
 
244
 
 
245
                CS_SET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4, DFLT_TRANS_CACHE_SIZE);
 
246
                
 
247
                CS_SET_DISK_8(trans->txn_DiskHeader.th_start_8, 0);
 
248
                CS_SET_DISK_8(trans->txn_DiskHeader.th_eol_8, 0);
 
249
                
 
250
                CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);
 
251
                CS_SET_DISK_1(trans->txn_DiskHeader.th_checksum_1, 1);
 
252
                CS_SET_DISK_1(trans->txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
 
253
                
 
254
                tr_file->write(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec));
 
255
                pop_(tr_file);
 
256
                trans->txn_SetFile(tr_file);
 
257
                
 
258
                trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
 
259
                
 
260
                trans->txn_TransCache = MSTransCache::newMSTransCache(DFLT_TRANS_CACHE_SIZE);
 
261
        } else { // The transaction log already exists
 
262
                bool overflow = false, recovered = false;
 
263
                
 
264
                CSFile *tr_file = path->createFile(CSFile::DEFAULT); // Open read/write
 
265
                push_(tr_file);
 
266
                
 
267
                // Read the log header:
 
268
                if (tr_file->read(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec), 0) < sizeof(MSDiskTransHeadRec)) {
 
269
                        release_(tr_file);
 
270
                        path->removeFile();
 
271
                        goto try_again;
 
272
                }
 
273
                
 
274
                // check the log header:                
 
275
                if (CS_GET_DISK_4(trans->txn_DiskHeader.th_magic_4) != MS_TRANS_LOG_MAGIC)
 
276
                        CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_HEADER_MAGIC);
 
277
                
 
278
                if (CS_GET_DISK_2(trans->txn_DiskHeader.th_version_2) != MS_TRANS_LOG_VERSION)
 
279
                        CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_VERSION_TOO_NEW);
 
280
                        
 
281
                //----
 
282
                if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_NO_OVERFLOW) 
 
283
                        overflow = false;
 
284
                else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_OVERFLOW) 
 
285
                        overflow = true;
 
286
                else 
 
287
                        CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
 
288
                
 
289
                //----
 
290
                if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_NOT_RECOVERED) 
 
291
                        recovered = false;
 
292
                else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_RECOVERED) 
 
293
                        recovered = true;
 
294
                else 
 
295
                        CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
 
296
 
 
297
                // Check that the log is the expected size.
 
298
                log_size = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8) * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
 
299
                
 
300
                if ((log_size > tr_file->getEOF()) || 
 
301
                        ((log_size < tr_file->getEOF()) && !overflow)){ 
 
302
                                        
 
303
                        char buffer[CS_EXC_MESSAGE_SIZE];               
 
304
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unexpected transaction log size: ");
 
305
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, path->getCString());
 
306
                        CSException::throwException(CS_CONTEXT, CS_ERR_BAD_FILE_HEADER, buffer);
 
307
                }
 
308
                
 
309
                trans->txn_MaxTID = CS_GET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4);
 
310
 
 
311
                // Looks good, we will assume it is a valid log file.
 
312
                trans->txn_TransCache = MSTransCache::newMSTransCache(CS_GET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4));
 
313
 
 
314
                pop_(tr_file);
 
315
                trans->txn_SetFile(tr_file);
 
316
 
 
317
                trans->txn_MaxCheckPoint = CS_GET_DISK_2(trans->txn_DiskHeader.th_check_point_2);
 
318
                
 
319
                trans->txn_MaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8);
 
320
                trans->txn_ReqestedMaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8);
 
321
                
 
322
                trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
 
323
                trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
 
324
                trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
 
325
                trans->txn_HaveOverflow = overflow;
 
326
                if (overflow) 
 
327
                        trans->txn_Overflow = (tr_file->getEOF() - sizeof(MSDiskTransHeadRec)) /sizeof(MSDiskTransRec); 
 
328
                else
 
329
                        trans->txn_Overflow = 0;
 
330
 
 
331
#ifdef DEBUG
 
332
                if (overflow)
 
333
                        printf("Recovering overflow log\n");
 
334
                if (dump_log) {
 
335
                        char name[100];
 
336
                        snprintf(name, 100, "%dms-trans-log.dump", (int)time(NULL));
 
337
                        trans->txn_DumpLog(name);
 
338
                }
 
339
#endif
 
340
                // Recover the log if required.
 
341
                if (!recovered)
 
342
                        trans->txn_Recover();
 
343
                        
 
344
                        
 
345
        }
 
346
        
 
347
        trans->txn_Recovered = true; // Any recovery required has been completed.
 
348
 
 
349
        // The log has been recovered so these values should be valid:
 
350
        trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
 
351
        trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
 
352
        
 
353
        // Set the header to indicate that the log has not been closed properly. 
 
354
        // This is reset when the log is closed during shutdown.
 
355
        CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_NOT_RECOVERED);
 
356
        trans->txn_File->write(&(trans->txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1);
 
357
        
 
358
        // Load the transaction records into memory.
 
359
        trans->txn_TransCache->tc_StartCacheReload(true);
 
360
        trans->txn_LoadTransactionCache(trans->txn_Start);
 
361
        trans->txn_TransCache->tc_CompleteCacheReload();
 
362
        
 
363
        if (trans->txn_MaxRecords != trans->txn_ReqestedMaxRecords) 
 
364
                trans->txn_ResizeLog(); // Try to resize but it may not be possible yet.
 
365
        
 
366
        release_(path);
 
367
        pop_(trans);
 
368
 
 
369
#ifdef TRACE_ALL
 
370
        
 
371
        txn_debug_log = fopen("log_dump.txt", "w+");
 
372
        if (!txn_debug_log) {
 
373
                perror("log_dump.txt");
 
374
        }
 
375
#endif
 
376
        
 
377
        return_(trans);
 
378
}
 
379
 
 
380
bool MSTrans::txn_ValidRecord(MSTransPtr rec)
 
381
{
 
382
        uint8_t check = rec->tr_check;
 
383
        bool ok;
 
384
        
 
385
        rec->tr_check = txn_Checksum;
 
386
        ok = (checksum((uint8_t*)rec, sizeof(MSTransRec)) == check);
 
387
        rec->tr_check = check;
 
388
        return ok;
 
389
}
 
390
 
 
391
void MSTrans::txn_GetRecordAt(uint64_t index, MSTransPtr rec)
 
392
{
 
393
        MSDiskTransRec drec;
 
394
        off64_t offset;
 
395
        
 
396
        // Read 1 record from the log and convert it from disk format.
 
397
        offset = sizeof(MSDiskTransHeadRec) + index * sizeof(MSDiskTransRec);           
 
398
        txn_File->read(&drec, offset, sizeof(MSDiskTransRec), sizeof(MSDiskTransRec));
 
399
        GET_DISK_TRANSREC(rec, &drec);
 
400
}
 
401
 
 
402
// Recovery involves finding the start of the first record and the eof
 
403
// position. The positions will be found at or after the position stored
 
404
// in the header. 
 
405
void MSTrans::txn_Recover()
 
406
{
 
407
        MSTransRec rec = {0,0,0,0,0,0,0};
 
408
        uint64_t original_eol = txn_EOL;
 
409
        enter_();
 
410
 
 
411
#ifdef DEBUG
 
412
printf("Recovering      transaction log!\n");
 
413
#endif
 
414
 
 
415
        txn_MaxTID = 0;
 
416
        // Search for the last valid record in the log starting from the last 
 
417
        // known position stored in the header.
 
418
        for (; txn_EOL < txn_MaxRecords; txn_EOL++) {
 
419
                txn_GetRecordAt(txn_EOL, &rec);
 
420
                if (! txn_ValidRecord(&rec))
 
421
                        break;  
 
422
        }
 
423
        
 
424
        if (txn_EOL == txn_MaxRecords) {
 
425
                // It looks like all the records in the log are valid?
 
426
                // This is strange but could happen if the crash
 
427
                // occurred just before updating the header as the
 
428
                // eol position rolled over to the top of the log.
 
429
                txn_EOL = 0;
 
430
        }
 
431
        
 
432
        txn_MaxTID++;
 
433
        
 
434
        CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
 
435
        
 
436
        // If the actual eol has moved pass the recorded start position
 
437
        // then the actuall start position must be some where beyond
 
438
        // the eol.
 
439
        if (((original_eol < txn_Start) || (original_eol > txn_EOL)) && (txn_EOL >= txn_Start)) 
 
440
                txn_Start = txn_EOL +1; 
 
441
 
 
442
        // Position the start at the beginning of a transaction.        
 
443
        uint64_t end_search = (txn_Start < txn_EOL)? txn_EOL : txn_MaxRecords;
 
444
        for (; txn_Start < end_search; txn_Start++) {
 
445
                txn_GetRecordAt(txn_Start, &rec);
 
446
                if (TRANS_IS_START(rec.tr_type))
 
447
                        break;  
 
448
        }
 
449
 
 
450
        if (txn_Start == end_search)
 
451
                txn_Start = txn_EOL; 
 
452
                
 
453
        CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
 
454
        
 
455
        txn_TransCache->tc_SetRecovering(true); 
 
456
        // Load the transaction records into the cache.
 
457
        txn_TransCache->tc_StartCacheReload(true);
 
458
        txn_LoadTransactionCache(txn_Start);
 
459
        txn_TransCache->tc_CompleteCacheReload();
 
460
        
 
461
        // Now go through all the transactions and add rollbacks for any
 
462
        // unterminated transactions.
 
463
        TRef ref;
 
464
        bool terminated;
 
465
        while  (txn_TransCache->tc_GetTransaction(&ref, &terminated)) {
 
466
                
 
467
                txn_MaxTID = txn_TransCache->tc_GetTransactionID(ref); // Save the TID of the last transaction.
 
468
                if (!terminated) {
 
469
                        self->myTID = txn_MaxTID;
 
470
                        self->myTransRef = ref;
 
471
                        self->myStartTxn = false;
 
472
                        txn_AddTransaction(MS_RecoveredTxn);
 
473
                }
 
474
CRASH_POINT(2);
 
475
                txn_TransCache->tc_FreeTransaction(ref);
 
476
                
 
477
                // Load the next block of transactions into the cache.
 
478
                // This needs to be done after each tc_GetTransaction() to make sure
 
479
                // that if the transaction terminator is some where in the log
 
480
                // it will get read even if the cache is completely full. 
 
481
                if (txn_TransCache->tc_ShoulReloadCache()) {
 
482
                        txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload(true));
 
483
                        txn_TransCache->tc_CompleteCacheReload();
 
484
                }
 
485
        }
 
486
        
 
487
        
 
488
        txn_TransCache->tc_SetRecovering(false); 
 
489
        self->myTransRef = 0;
 
490
        
 
491
        // Update the header again incase rollbacks have been added.
 
492
        CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
 
493
                                
 
494
        exit_();
 
495
}
 
496
 
 
497
bool ReadTXNLog::rl_CanContinue() 
 
498
 
499
        return rl_log->txn_TransCache->tc_ContinueCacheReload();
 
500
}
 
501
 
 
502
void ReadTXNLog::rl_Load(uint64_t log_position, MSTransPtr rec) 
 
503
{
 
504
        rl_log->txn_TransCache->tc_AddRec(log_position, rec);
 
505
}
 
506
 
 
507
void ReadTXNLog::rl_Store(uint64_t log_position, MSTransPtr rec) 
 
508
{
 
509
        MSDiskTransRec drec;
 
510
        SET_DISK_TRANSREC(&drec, rec);
 
511
 
 
512
        rl_log->txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + log_position * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec));
 
513
}
 
514
 
 
515
void ReadTXNLog::rl_Flush() 
 
516
{
 
517
        rl_log->txn_File->flush();
 
518
        rl_log->txn_File->sync();
 
519
}
 
520
 
 
521
void ReadTXNLog::rl_ReadLog(uint64_t read_start, bool log_locked)
 
522
{
 
523
        uint64_t        size, orig_size;
 
524
        bool reading_overflow = (read_start >= rl_log->txn_MaxRecords);
 
525
        enter_();
 
526
        
 
527
 
 
528
        // Get the number of transaction records to be loaded.
 
529
        if (reading_overflow) {
 
530
                orig_size = rl_log->txn_Overflow;
 
531
                size = rl_log->txn_Overflow - read_start;
 
532
        } else {
 
533
                orig_size = rl_log->txn_GetNumRecords();
 
534
        
 
535
                if (rl_log->txn_Start <= read_start)
 
536
                        size = orig_size - (read_start - rl_log->txn_Start);
 
537
                else 
 
538
                        size = rl_log->txn_EOL - read_start;
 
539
        }
 
540
        
 
541
        // load all the records
 
542
        while (size && rl_CanContinue()) {
 
543
                MSDiskTransRec diskRecords[1000];
 
544
                uint32_t read_size;
 
545
                off64_t offset;
 
546
                
 
547
                if (size > 1000) 
 
548
                        read_size = 1000 ;
 
549
                else
 
550
                        read_size = size ;
 
551
                
 
552
                // Check if we have reached the wrap around point in the log.
 
553
                if ((!reading_overflow) && (rl_log->txn_EOL < read_start) && ((rl_log->txn_MaxRecords - read_start) < read_size))
 
554
                        read_size = rl_log->txn_MaxRecords - read_start ;
 
555
 
 
556
                // Read the next block of records.
 
557
                offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);              
 
558
                rl_log->txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
 
559
                
 
560
                // Convert the records from disk format and add them to the cache.
 
561
                for (uint32_t i = 0; i < read_size && rl_CanContinue(); i++) {
 
562
                        MSTransRec rec;
 
563
                        MSDiskTransPtr drec = diskRecords + i;
 
564
                        GET_DISK_TRANSREC(&rec, drec);
 
565
                        
 
566
                        rl_Load(read_start + i, &rec); 
 
567
                }
 
568
                
 
569
                size -= read_size;
 
570
                read_start += read_size;
 
571
                if (read_start == rl_log->txn_MaxRecords)
 
572
                        read_start = 0;
 
573
        }
 
574
        
 
575
        if (rl_log->txn_HaveOverflow && !reading_overflow) {
 
576
                if (rl_CanContinue()) 
 
577
                        rl_ReadLog(rl_log->txn_MaxRecords, false);
 
578
                
 
579
        } else if (!log_locked) {       
 
580
                // The following is intended to prevent the case where a writer 
 
581
                // writes an txn record while the cache is full but just after 
 
582
                // the reload has completed. If the cache is not yet full we need
 
583
                // to load as many of the new records into cache as possible.
 
584
        
 
585
                uint64_t        new_size;
 
586
                lock_(rl_log);
 
587
                if (reading_overflow)
 
588
                        new_size = rl_log->txn_Overflow;
 
589
                else 
 
590
                        new_size = rl_log->txn_GetNumRecords();
 
591
                if (rl_CanContinue() && (orig_size != new_size)) {
 
592
                        rl_ReadLog(read_start, true);
 
593
                }
 
594
                unlock_(rl_log);
 
595
        }
 
596
        
 
597
 
 
598
        exit_();
 
599
}
 
600
 
 
601
void MSTrans::txn_LoadTransactionCache(uint64_t read_start)
 
602
{
 
603
        ReadTXNLog log(this);
 
604
        enter_();
 
605
        log.rl_ReadLog(read_start, false);
 
606
        txn_TransCache->tc_UpdateCacheVersion(); // Signal writes to recheck cache for overflow txn refs.
 
607
        exit_();
 
608
}
 
609
 
 
610
void  MSTrans::txn_ResizeLog()
 
611
{
 
612
        enter_();
 
613
        
 
614
        lock_(this);
 
615
        if (TRANS_CAN_RESIZE) {
 
616
                // TRANS_CAN_RESIZE checks that there is no overflow and the the start position 
 
617
                // is less than eol. This implies the from eol to the end of file doesn't contain
 
618
                // and used records.
 
619
                
 
620
 
 
621
#ifdef DEBUG    
 
622
                uint64_t old_size = txn_MaxRecords;
 
623
#endif          
 
624
                if (txn_MaxRecords > txn_ReqestedMaxRecords) { // Shrink the log
 
625
                        uint64_t max_resize = txn_MaxRecords - txn_EOL;
 
626
                        
 
627
                        if ( txn_Start == txn_EOL)
 
628
                                max_resize = txn_MaxRecords;
 
629
                        else {
 
630
                                max_resize = txn_MaxRecords - txn_EOL;
 
631
                                if (!txn_Start) // If start is at '0' then the EOL cannot be wrapped.
 
632
                                        max_resize--;
 
633
                        }
 
634
                        
 
635
                                
 
636
                        if (max_resize > (txn_MaxRecords - txn_ReqestedMaxRecords))
 
637
                                max_resize = txn_MaxRecords - txn_ReqestedMaxRecords;
 
638
                                                        
 
639
                        txn_MaxRecords -=       max_resize;
 
640
                } else
 
641
                        txn_MaxRecords = txn_ReqestedMaxRecords; // Grow the log
 
642
 
 
643
#ifdef DEBUG                    
 
644
                char buffer[CS_EXC_MESSAGE_SIZE];               
 
645
                snprintf(buffer, CS_EXC_MESSAGE_SIZE, "Resizing the Transaction log from %"PRIu64" to %"PRIu64" \n",  old_size, txn_MaxRecords);
 
646
                CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, buffer);
 
647
#endif
 
648
 
 
649
                CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
 
650
                
 
651
                txn_File->setEOF(txn_MaxRecords * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec));
 
652
                txn_File->write(&(txn_DiskHeader.th_list_size_8), offsetof(MSDiskTransHeadRec, th_list_size_8), 8);
 
653
                
 
654
                if (txn_Start == txn_EOL) {
 
655
                        txn_Start = 0;
 
656
                        txn_EOL = 0;
 
657
                } else if (txn_MaxRecords == txn_EOL) {
 
658
                        txn_EOL = 0;
 
659
                }
 
660
                
 
661
                txn_ResetEOL();
 
662
                                
 
663
        }       
 
664
        unlock_(this);
 
665
        
 
666
        exit_();
 
667
}
 
668
 
 
669
void  MSTrans::txn_ResetEOL()
 
670
{
 
671
        enter_();
 
672
        
 
673
        txn_EOLCheckPoint = txn_MaxCheckPoint;
 
674
        txn_StartCheckPoint = txn_MaxCheckPoint;
 
675
        
 
676
        if (!txn_EOL)
 
677
                txn_Checksum++;
 
678
        CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
 
679
        CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
 
680
        CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
 
681
        txn_File->write(&(txn_DiskHeader.th_start_8), 
 
682
                                                offsetof(MSDiskTransHeadRec, th_start_8), 
 
683
                                                sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
 
684
CRASH_POINT(5);
 
685
        txn_File->flush();
 
686
        txn_File->sync();
 
687
CRASH_POINT(10);
 
688
                
 
689
        exit_();
 
690
}
 
691
 
 
692
#define PRINT_TRANS(tid, a, t) 
 
693
 
 
694
#ifndef PRINT_TRANS
 
695
#define PRINT_TRANS(tid, a, t) printTrans(tid, a, t)
 
696
static void printTrans(uint32_t tid, bool autocommit, MS_Txn type)
 
697
{
 
698
        const char *type_name = "???";
 
699
        
 
700
        switch (type) {
 
701
                case MS_RollBackTxn:
 
702
                        type_name = "Rollback";
 
703
                        break;
 
704
                case MS_PartialRollBackTxn:
 
705
                        type_name = "PartialRollBack";
 
706
                        break;
 
707
                case MS_CommitTxn:
 
708
                        type_name = "Commit";
 
709
                        break;
 
710
                case MS_ReferenceTxn:
 
711
                        type_name = "Reference";
 
712
                        break;
 
713
                case MS_DereferenceTxn:
 
714
                        type_name = "Dereference";
 
715
                        break;
 
716
                case MS_RecoveredTxn:
 
717
                        type_name = "Recovered";
 
718
                        break;
 
719
        }
 
720
        
 
721
        fprintf(stderr, "MSTrans::txn_LogTransaction(%d, autocommit = %s, %s)\n", tid, (autocommit)?"On":"Off", type_name);
 
722
 
 
723
}
 
724
#endif
 
725
 
 
726
void MSTrans::txn_LogTransaction(MS_Txn type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
 
727
{
 
728
        enter_();
 
729
        
 
730
        lock_(this);
 
731
        if (!self->myTID) {
 
732
                switch (type) {
 
733
                        case MS_RollBackTxn:
 
734
                        case MS_PartialRollBackTxn:
 
735
                        case MS_CommitTxn: {
 
736
                                unlock_(this);
 
737
                                exit_();;
 
738
                        }
 
739
                        case MS_ReferenceTxn:
 
740
                        case MS_DereferenceTxn:
 
741
                        case MS_RecoveredTxn:
 
742
                                break;
 
743
                }
 
744
                txn_MaxTID++;
 
745
                self->myTID = txn_MaxTID;
 
746
                self->myTransRef = TRANS_CACHE_NEW_REF;
 
747
                self->myStartTxn = true;
 
748
        }
 
749
 
 
750
        PRINT_TRANS(self->myTID, autocommit, type);
 
751
        
 
752
        txn_AddTransaction(type, autocommit, db_id, tab_id, blob_id, blob_ref_id);
 
753
        if (autocommit || TRANS_TYPE_IS_TERMINATED(type))
 
754
                txn_NewTransaction();
 
755
                
 
756
        unlock_(this);
 
757
        
 
758
        exit_();
 
759
}
 
760
 
 
761
void  MSTrans::txn_AddTransaction(uint8_t tran_type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
 
762
{
 
763
        MSTransRec rec = {0,0,0,0,0,0,0}; // This must be set to zero so that the checksum will be valid. 
 
764
        MSDiskTransRec drec;
 
765
        uint64_t new_offset = txn_EOL;
 
766
        bool do_flush = true;
 
767
 
 
768
        enter_();
 
769
 
 
770
        
 
771
        // Check that the log is not already full.
 
772
        if (txn_IsFull()) {             
 
773
                if (!txn_HaveOverflow) { // The first overflow record: update the header.
 
774
                        CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_OVERFLOW);
 
775
                        txn_File->write(&(txn_DiskHeader.th_overflow_1), offsetof(MSDiskTransHeadRec, th_overflow_1), 1);
 
776
 
 
777
                        CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
 
778
                        CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
 
779
                        txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
 
780
 
 
781
                        txn_File->flush();
 
782
                        txn_File->sync();
 
783
                        txn_HaveOverflow = true;
 
784
                        txn_OverflowCount++;
 
785
                        txn_Overflow =  txn_MaxRecords;         
 
786
                }
 
787
                
 
788
                new_offset = txn_Overflow;
 
789
        }
 
790
                
 
791
        rec.tr_id = self->myTID ;
 
792
        rec.tr_type = tran_type;
 
793
        rec.tr_db_id = db_id;
 
794
        rec.tr_tab_id = tab_id;
 
795
        rec.tr_blob_id = blob_id;
 
796
        rec.tr_blob_ref_id = blob_ref_id;
 
797
        
 
798
        if (self->myStartTxn) {
 
799
                TRANS_SET_START(rec.tr_type);
 
800
                self->myStartTxn = false;
 
801
        }
 
802
                
 
803
        if (autocommit) {
 
804
                TRANS_SET_AUTOCOMMIT(rec.tr_type);
 
805
        }
 
806
                
 
807
#ifdef TRACE_ALL
 
808
if (txn_debug_log){
 
809
char *ttype, *cmt;
 
810
switch (TRANS_TYPE(rec.tr_type)) {
 
811
        case MS_ReferenceTxn:
 
812
                ttype = "+";
 
813
                break;
 
814
        case MS_DereferenceTxn:
 
815
                ttype = "-";
 
816
                break;
 
817
        case MS_RollBackTxn:
 
818
                ttype = "rb";
 
819
                rec.tr_blob_ref_id = 0;
 
820
                break;
 
821
        case MS_RecoveredTxn:
 
822
                ttype = "rcov";
 
823
                rec.tr_blob_ref_id = 0;
 
824
                break;
 
825
        default:
 
826
                ttype = "???";
 
827
}
 
828
 
 
829
if (TRANS_IS_TERMINATED(rec.tr_type))
 
830
        cmt = "c";
 
831
else
 
832
        cmt = "";
 
833
                        
 
834
fprintf(txn_debug_log, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" %"PRIu64" %"PRIu64"  %"PRIu64" %d\n", self->myTID, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id, txn_Start, txn_EOL, new_offset, txn_HaveOverflow);
 
835
}
 
836
#endif
 
837
 
 
838
        rec.tr_check = txn_Checksum; 
 
839
        
 
840
        // Calculate the records checksum.
 
841
        rec.tr_check = checksum((uint8_t*)&rec, sizeof(rec));
 
842
        
 
843
        // Write the record to disk.
 
844
        SET_DISK_TRANSREC(&drec, &rec);
 
845
#ifdef CRASH_TEST
 
846
 
 
847
        if (trans_test_crash_point == 9) { // do a partial write before crashing
 
848
                txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec)/2 );
 
849
                CRASH_POINT(9);
 
850
        } else
 
851
                txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
 
852
#else   
 
853
        txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
 
854
#endif
 
855
CRASH_POINT(3);
 
856
        // There is no need to sync if the transaction is still running.
 
857
        if (TRANS_IS_TERMINATED(tran_type)) {
 
858
                CRASH_POINT(4); // This crash will result in a verify error because the txn was committed to the log but not the database.
 
859
                txn_File->flush();
 
860
                txn_File->sync();
 
861
                do_flush = false;
 
862
        }
 
863
        
 
864
        if (!txn_HaveOverflow) { // No need to update the header if overflowing.
 
865
                uint64_t rec_offset = txn_EOL;
 
866
                
 
867
                txn_EOL = new_offset;
 
868
                txn_EOL++;
 
869
                
 
870
                if (txn_EOL == txn_MaxRecords) {
 
871
                        // The eol has rolled over.
 
872
                        txn_EOL = 0;            
 
873
                }
 
874
 
 
875
                txn_EOLCheckPoint--;
 
876
                if ((!txn_EOLCheckPoint) || !txn_EOL) {
 
877
                        
 
878
                        // Flush the previouse write if required before updating the header.
 
879
                        // This is just in case it crashes during the sync to make sure that the
 
880
                        // header information is correct for the data on disk. If the crash occurred
 
881
                        // between writing the header and the record the header on disk would be wrong.
 
882
                        if (do_flush) {
 
883
                                txn_File->flush();
 
884
                                txn_File->sync();
 
885
                        }
 
886
                        
 
887
                        txn_ResetEOL();
 
888
                }
 
889
                
 
890
                txn_TransCache->tc_AddRec(rec_offset, &rec, self->myTransRef);
 
891
                
 
892
                if (txn_GetNumRecords() > txn_HighWaterMark)
 
893
                        txn_HighWaterMark = txn_GetNumRecords();
 
894
                        
 
895
        } else { // Ovewrflow
 
896
                txn_TransCache->tc_AddRec(txn_Overflow, &rec, self->myTransRef);
 
897
                txn_Overflow++;
 
898
                if (txn_Overflow > txn_HighWaterMark)
 
899
                        txn_HighWaterMark = txn_Overflow;
 
900
        }
 
901
        
 
902
        ASSERT(txn_EOL < txn_MaxRecords);
 
903
        ASSERT(txn_Start < txn_MaxRecords);
 
904
        exit_();
 
905
}
 
906
 
 
907
uint64_t MSTrans::txn_GetSize()         
 
908
{
 
909
        return sizeof(MSDiskTransHeadRec) + txn_MaxRecords * sizeof(MSDiskTransRec);
 
910
}
 
911
 
 
912
//---------------
 
913
void MSTrans::txn_NewTransaction()
 
914
 
915
        enter_();
 
916
 
 
917
        self->myTID = 0;        // This will be assigned when the first record is written.
 
918
        
 
919
        exit_();
 
920
}
 
921
 
 
922
//---------------
 
923
void MSTrans::txn_PerformIdleTasks()
 
924
{
 
925
        enter_();
 
926
        
 
927
        if (txn_TransCache->tc_ShoulReloadCache()) {
 
928
                txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload());
 
929
                txn_TransCache->tc_CompleteCacheReload();
 
930
                exit_();
 
931
        }
 
932
        
 
933
        // During backup the reader is suspended. This may need to be changed
 
934
        // if we decide to actually do something here.
 
935
        txn_reader->suspendedWait(1000);
 
936
        exit_();
 
937
}
 
938
 
 
939
//---------------
 
940
void MSTrans::txn_ResetReadPosition(uint64_t pos)
 
941
{       
 
942
        bool rollover = (pos < txn_Start);
 
943
        enter_();
 
944
        
 
945
        if (pos >= txn_MaxRecords) { // Start of overflow
 
946
                lock_(this);
 
947
                
 
948
                // Overflow has occurred and the circular list is now empty 
 
949
                // so expand the list to include the overflow and 
 
950
                // reset txn_Start and txn_EOL
 
951
                txn_Start = txn_MaxRecords;
 
952
                txn_MaxRecords = txn_Overflow;
 
953
                txn_EOL = 0;
 
954
                txn_HaveOverflow = false;
 
955
                txn_Overflow = 0;
 
956
                
 
957
                CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
 
958
                CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
 
959
                txn_File->write(&(txn_DiskHeader.th_overflow_1),        offsetof(MSDiskTransHeadRec, th_overflow_1),    1);
 
960
                txn_File->write(&(txn_DiskHeader.th_list_size_8),       offsetof(MSDiskTransHeadRec, th_list_size_8),   8);
 
961
                                
 
962
                txn_ResetEOL();
 
963
                
 
964
                unlock_(this);
 
965
        } else
 
966
                txn_Start = pos;        
 
967
        
 
968
        ASSERT(txn_Start <= txn_MaxRecords);
 
969
        
 
970
        if (!rollover)
 
971
                txn_StartCheckPoint -= (pos - txn_Start);
 
972
        
 
973
        // Flush the header if the read position has rolled over or it is time. 
 
974
        if ( rollover || (txn_StartCheckPoint <=0)) {
 
975
                lock_(this);
 
976
                CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
 
977
                CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
 
978
                txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
 
979
CRASH_POINT(5);
 
980
                txn_File->flush();
 
981
                txn_File->sync();
 
982
                txn_StartCheckPoint = txn_MaxCheckPoint;
 
983
                unlock_(this);
 
984
        }
 
985
        
 
986
CRASH_POINT(6);
 
987
        
 
988
        if (TRANS_CAN_RESIZE) 
 
989
                txn_ResizeLog();
 
990
                
 
991
        exit_();
 
992
}
 
993
//---------------
 
994
bool MSTrans::txn_haveNextTransaction() 
 
995
{
 
996
        bool terminated = false;
 
997
        TRef ref;
 
998
        
 
999
        txn_TransCache->tc_GetTransaction(&ref, &terminated);
 
1000
        
 
1001
        return terminated;
 
1002
}
 
1003
 
 
1004
//---------------
 
1005
void MSTrans::txn_GetNextTransaction(MSTransPtr tran, MS_TxnState *state)
 
1006
{
 
1007
        bool terminated;
 
1008
        uint64_t log_position;
 
1009
        enter_();
 
1010
        
 
1011
        ASSERT(txn_reader == self);
 
1012
        lock_(txn_reader);
 
1013
        
 
1014
        do {
 
1015
                // Get the next completed transaction.
 
1016
                // this will suspend the current thread, which is assumed
 
1017
                // to be the log reader, until one is available.
 
1018
                while ((!txn_IsTxnValid) && !self->myMustQuit) {
 
1019
 
 
1020
                        // wait until backup has completed.
 
1021
                        while (txn_Doingbackup && !self->myMustQuit)
 
1022
                                txn_PerformIdleTasks();
 
1023
        
 
1024
                        if (txn_TransCache->tc_GetTransaction(&txn_CurrentTxn, &terminated) && terminated) {
 
1025
                                txn_IsTxnValid = true;
 
1026
                                txn_TxnIndex = 0;
 
1027
                        } else
 
1028
                                txn_PerformIdleTasks();
 
1029
                }
 
1030
                
 
1031
                if (self->myMustQuit)
 
1032
                        break;
 
1033
                        
 
1034
                if (txn_TransCache->tc_GetRecAt(txn_CurrentTxn, txn_TxnIndex++, tran, state)) 
 
1035
                        break;
 
1036
                        
 
1037
CRASH_POINT(7);
 
1038
                txn_TransCache->tc_FreeTransaction(txn_CurrentTxn);
 
1039
CRASH_POINT(8);
 
1040
                if (txn_TransCache->tc_GetTransactionStartPosition(&log_position)) {
 
1041
                        txn_ResetReadPosition(log_position);
 
1042
                }else{
 
1043
                        if (txn_TransCache->tc_ShoulReloadCache()) {
 
1044
                                uint64_t pos = txn_TransCache->tc_StartCacheReload();
 
1045
                                txn_ResetReadPosition(pos);
 
1046
                                txn_LoadTransactionCache(pos);
 
1047
                                txn_TransCache->tc_CompleteCacheReload();
 
1048
                        } else {
 
1049
                                // Lock the object to prevent writer thread updates while I check again.
 
1050
                                // This is to ensure that txn_EOL is not changed between the call to
 
1051
                                // tc_GetTransactionStartPosition() and setting the read position.
 
1052
                                lock_(this);
 
1053
                                if (txn_TransCache->tc_GetTransactionStartPosition(&log_position)) 
 
1054
                                        txn_ResetReadPosition(log_position);
 
1055
                                else
 
1056
                                        txn_ResetReadPosition(txn_EOL);
 
1057
                                unlock_(this);
 
1058
                        }
 
1059
                }
 
1060
                
 
1061
                txn_IsTxnValid = false;
 
1062
                        
 
1063
        } while (1);
 
1064
        
 
1065
        unlock_(txn_reader);
 
1066
        exit_();
 
1067
}
 
1068
 
 
1069
 
 
1070
void MSTrans::txn_GetStats(MSTransStatsPtr stats)
 
1071
{
 
1072
        
 
1073
        if (txn_HaveOverflow) {
 
1074
                stats->ts_IsOverflowing = true;
 
1075
                stats->ts_LogSize = txn_Overflow;
 
1076
        } else {
 
1077
                stats->ts_IsOverflowing = false;
 
1078
                stats->ts_LogSize = txn_GetNumRecords();
 
1079
        }
 
1080
        stats->ts_PercentFull = (stats->ts_LogSize * 100) / CS_GET_DISK_8(txn_DiskHeader.th_requested_list_size_8);
 
1081
 
 
1082
        stats->ts_MaxSize = txn_HighWaterMark;
 
1083
        stats->ts_OverflowCount = txn_OverflowCount;
 
1084
        
 
1085
        stats->ts_TransCacheSize = txn_TransCache->tc_GetCacheUsed();
 
1086
        stats->ts_PercentTransCacheUsed = txn_TransCache->tc_GetPercentCacheUsed();
 
1087
        stats->ts_PercentCacheHit = txn_TransCache->tc_GetPercentCacheHit();
 
1088
}
 
1089
 
 
1090
void MSTrans::txn_SetCacheSize(uint32_t new_size)
 
1091
{
 
1092
        enter_();
 
1093
        // Important lock order. Writer threads never lock the reader but the reader
 
1094
        // may lock this object so always lock the reader first.
 
1095
        lock_(txn_reader);
 
1096
        lock_(this);
 
1097
 
 
1098
        CS_SET_DISK_4(txn_DiskHeader.th_requested_cache_size_4, new_size);
 
1099
        
 
1100
        txn_File->write(&(txn_DiskHeader.th_requested_cache_size_4), offsetof(MSDiskTransHeadRec, th_requested_cache_size_4), 4);
 
1101
        txn_File->flush();
 
1102
        txn_File->sync();
 
1103
 
 
1104
        txn_TransCache->tc_SetSize(new_size);
 
1105
 
 
1106
        unlock_(this);
 
1107
        unlock_(txn_reader);
 
1108
        exit_();
 
1109
}
 
1110
 
 
1111
void MSTrans::txn_SetLogSize(uint64_t new_size)
 
1112
{
 
1113
        enter_();
 
1114
        
 
1115
        // Important lock order. Writer threads never lock the reader but the reader
 
1116
        // may lock this object so always lock the reader first.
 
1117
        lock_(txn_reader);
 
1118
        lock_(this);
 
1119
        
 
1120
        txn_ReqestedMaxRecords = (new_size - sizeof(MSDiskTransHeadRec)) / sizeof(MSDiskTransRec);
 
1121
        
 
1122
        if (txn_ReqestedMaxRecords < 10)
 
1123
                txn_ReqestedMaxRecords = 10;
 
1124
        
 
1125
        CS_SET_DISK_8(txn_DiskHeader.th_requested_list_size_8, txn_ReqestedMaxRecords);
 
1126
        
 
1127
        txn_File->write(&(txn_DiskHeader.th_requested_list_size_8), offsetof(MSDiskTransHeadRec, th_requested_list_size_8), 8);
 
1128
        txn_File->flush();
 
1129
        txn_File->sync();
 
1130
        
 
1131
        unlock_(this);
 
1132
        unlock_(txn_reader);
 
1133
        
 
1134
        exit_();
 
1135
}
 
1136
 
 
1137
// A helper class for resetting database IDs in the transaction log.
 
1138
class DBSearchTXNLog : ReadTXNLog {
 
1139
        public:
 
1140
        DBSearchTXNLog(MSTrans *log): ReadTXNLog(log), sdb_db_id(0), sdb_isDirty(false) {}
 
1141
        
 
1142
        uint32_t sdb_db_id; 
 
1143
        bool sdb_isDirty;
 
1144
        
 
1145
        virtual bool rl_CanContinue() { return true;}
 
1146
        virtual void rl_Load(uint64_t log_position, MSTransPtr rec) 
 
1147
        {
 
1148
                if  (rec->tr_db_id == sdb_db_id) {
 
1149
                        sdb_isDirty = true;
 
1150
                        rec->tr_db_id = 0;
 
1151
                        rl_Store(log_position, rec);
 
1152
                } 
 
1153
        }
 
1154
        
 
1155
        void SetDataBaseIDToZero(uint32_t db_id)
 
1156
        {
 
1157
                sdb_db_id = db_id;
 
1158
                rl_ReadLog(rl_log->txn_GetStartPosition(), false);
 
1159
                if (sdb_isDirty)
 
1160
                        rl_Flush();
 
1161
        }
 
1162
};
 
1163
 
 
1164
// Dropping the database from the transaction log just involves
 
1165
// scanning the log and setting the database id of any transactions 
 
1166
// involving the dropped database to zero.
 
1167
void MSTrans::txn_dropDatabase(uint32_t db_id)
 
1168
{
 
1169
        enter_();
 
1170
        
 
1171
        // Important lock order. Writer threads never lock the reader but the reader
 
1172
        // may lock this object so always lock the reader first.
 
1173
        lock_(txn_reader);
 
1174
        lock_(this);
 
1175
        
 
1176
        // Clear any transaction records in the cache for the dropped database;
 
1177
        txn_TransCache->tc_dropDatabase(db_id);
 
1178
        
 
1179
        // Scan the log setting the database ID for any record belonging to the
 
1180
        // dropped database to zero. 
 
1181
        DBSearchTXNLog searchLog(this);
 
1182
        
 
1183
        searchLog.SetDataBaseIDToZero(db_id);
 
1184
                
 
1185
        unlock_(this);
 
1186
        unlock_(txn_reader);
 
1187
        exit_();
 
1188
}
 
1189
 
 
1190
#ifdef DEBUG    
 
1191
void MSTrans::txn_DumpLog(const char *file)
 
1192
{
 
1193
        size_t  size, read_start = 0;
 
1194
        FILE *fptr;
 
1195
        enter_();
 
1196
        
 
1197
        fptr = fopen(file, "w+");
 
1198
        if (!fptr) {
 
1199
                perror(file);
 
1200
                return;
 
1201
        }
 
1202
        
 
1203
        if (txn_Overflow)
 
1204
                size = txn_Overflow;
 
1205
        else
 
1206
                size = txn_MaxRecords;
 
1207
        
 
1208
        // Dump all the records
 
1209
        while (size) {
 
1210
                MSDiskTransRec diskRecords[1000];
 
1211
                uint32_t read_size;
 
1212
                off64_t offset;
 
1213
                
 
1214
                if (size > 1000) 
 
1215
                        read_size = 1000 ;
 
1216
                else
 
1217
                        read_size = size ;
 
1218
                
 
1219
                // Read the next block of records.
 
1220
                offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);              
 
1221
                txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
 
1222
                
 
1223
                for (uint32_t i = 0; i < read_size; i++) {
 
1224
                        const char *ttype, *cmt;
 
1225
                        MSTransRec rec;
 
1226
                        MSDiskTransPtr drec = diskRecords + i;
 
1227
                        GET_DISK_TRANSREC(&rec, drec);
 
1228
                        
 
1229
                        switch (TRANS_TYPE(rec.tr_type)) {
 
1230
                                case MS_ReferenceTxn:
 
1231
                                        ttype = "+";
 
1232
                                        break;
 
1233
                                case MS_DereferenceTxn:
 
1234
                                        ttype = "-";
 
1235
                                        break;
 
1236
                                case MS_RollBackTxn:
 
1237
                                        ttype = "rb";
 
1238
                                        rec.tr_blob_ref_id = 0;
 
1239
                                        break;
 
1240
                                case MS_RecoveredTxn:
 
1241
                                        ttype = "rcov";
 
1242
                                        rec.tr_blob_ref_id = 0;
 
1243
                                        break;
 
1244
                                default:
 
1245
                                        ttype = "???";
 
1246
                        }
 
1247
                        
 
1248
                        if (TRANS_IS_TERMINATED(rec.tr_type))
 
1249
                                cmt = "c";
 
1250
                        else
 
1251
                                cmt = "";
 
1252
                        
 
1253
                        
 
1254
                        fprintf(fptr, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" \t %s %s %s\n", rec.tr_id, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id, 
 
1255
                                ((read_start + i) == txn_Start) ? "START":"",
 
1256
                                ((read_start + i) == txn_EOL) ? "EOL":"",
 
1257
                                ((read_start + i) == txn_MaxRecords) ? "OverFlow":""
 
1258
                                );
 
1259
                }
 
1260
                
 
1261
                size -= read_size;
 
1262
                read_start += read_size;
 
1263
        }
 
1264
        fclose(fptr);
 
1265
        exit_();
 
1266
}       
 
1267
 
 
1268
#endif
 
1269