1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25
* PBMS transaction handling.
27
* PBMS uses 1 circular transaction log. All BLOB reference operations are written to this log
28
* and are applied to the repository when committed. There is 1 thread dedicated to reading the
29
* transaction log and applying the changes. During an engine level backup this thread is suspended
30
* so that no transactions will be applied to the repository files as they are backed up.
33
#include "cslib/CSConfig.h"
38
#include "cslib/CSGlobal.h"
39
#include "cslib/CSStrUtil.h"
40
#include "cslib/CSStorage.h"
42
#include "trans_log_ms.h"
43
#include "trans_cache_ms.h"
46
uint32_t trans_test_crash_point;
47
#define CRASH_POINT(p) { if (p == trans_test_crash_point) { char *ptr = NULL; printf("Crash on demand at: %s(%d), start: %"PRIu64", eol: %"PRIu64"\n", __FILE__, __LINE__, txn_Start, txn_EOL); *ptr = 88;}}
49
#define CRASH_POINT(p)
52
#define MS_TRANS_LOG_MAGIC 0xA6E7D7B3
53
#define MS_TRANS_LOG_VERSION 1
54
#define MS_TRANS_LOG_RECOVERED 0XA1
55
#define MS_TRANS_LOG_NOT_RECOVERED 0XA2
56
#define MS_TRANS_NO_OVERFLOW 0XB1
57
#define MS_TRANS_OVERFLOW 0XB2
59
#define DFLT_TRANS_CHECKPOINT_THRESHOLD 1024
61
#define DFLT_TRANS_LOG_LIST_SIZE (1024 * 10)
62
#define DFLT_TRANS_CACHE_SIZE (500)
64
#define TRANS_CAN_RESIZE ((txn_MaxRecords != txn_ReqestedMaxRecords) && (txn_EOL >= txn_Start) && !txn_HaveOverflow)
66
typedef struct MSDiskTrans {
67
CSDiskValue4 dtr_id_4; // The transaction ID
68
CSDiskValue1 dtr_type_1; // The transaction type. If the first bit is set then the transaction is an autocommit.
69
CSDiskValue1 dtr_check_1; // The trransaction record checksum.
70
CSDiskValue4 dtr_db_id_4; // The database ID for the operation.
71
CSDiskValue4 dtr_tab_id_4; // The table ID for the operation.
72
CSDiskValue8 dtr_blob_id_8; // The blob ID for the operation.
73
CSDiskValue8 dtr_blob_ref_id_8; // The blob reference id.
74
} MSDiskTransRec, *MSDiskTransPtr;
76
#define SET_DISK_TRANSREC(d, s) { \
77
CS_SET_DISK_4((d)->dtr_id_4, (s)->tr_id);\
78
CS_SET_DISK_1((d)->dtr_type_1, (s)->tr_type);\
79
CS_SET_DISK_1((d)->dtr_check_1, (s)->tr_check);\
80
CS_SET_DISK_4((d)->dtr_db_id_4, (s)->tr_db_id);\
81
CS_SET_DISK_4((d)->dtr_tab_id_4, (s)->tr_tab_id);\
82
CS_SET_DISK_8((d)->dtr_blob_id_8, (s)->tr_blob_id);\
83
CS_SET_DISK_8((d)->dtr_blob_ref_id_8, (s)->tr_blob_ref_id);\
86
#define GET_DISK_TRANSREC(s, d) { \
87
(s)->tr_id = CS_GET_DISK_4((d)->dtr_id_4);\
88
(s)->tr_type = CS_GET_DISK_1((d)->dtr_type_1);\
89
(s)->tr_check = CS_GET_DISK_1((d)->dtr_check_1);\
90
(s)->tr_db_id = CS_GET_DISK_4((d)->dtr_db_id_4);\
91
(s)->tr_tab_id = CS_GET_DISK_4((d)->dtr_tab_id_4);\
92
(s)->tr_blob_id = CS_GET_DISK_8((d)->dtr_blob_id_8);\
93
(s)->tr_blob_ref_id = CS_GET_DISK_8((d)->dtr_blob_ref_id_8);\
96
static uint8_t checksum(uint8_t *data, size_t len)
98
register uint32_t sum = 0, g;
101
chk = data + len - 1;
103
sum = (sum << 4) + *chk;
104
if ((g = sum & 0xF0000000)) {
105
sum = sum ^ (g >> 24);
110
return (uint8_t) (sum ^ (sum >> 24) ^ (sum >> 16) ^ (sum >> 8));
115
txn_MaxCheckPoint(0),
116
txn_Doingbackup(false),
118
txn_IsTxnValid(false),
121
txn_StartCheckPoint(0),
122
txn_TransCache(NULL),
123
txn_BlockingTransaction(0),
125
txn_EOLCheckPoint(0),
127
txn_ReqestedMaxRecords(0),
128
txn_HighWaterMark(0),
129
txn_OverflowCount(0),
131
txn_Recovered(false),
132
txn_HaveOverflow(false),
144
txn_TransCache->release();
148
void MSTrans::txn_Close()
152
// Set the header to indicate that the log has not been closed properly.
153
CS_SET_DISK_4(txn_DiskHeader.th_next_txn_id_4, txn_MaxTID);
154
txn_File->write(&(txn_DiskHeader.th_next_txn_id_4), offsetof(MSDiskTransHeadRec, th_next_txn_id_4), 4 );
156
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
157
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
158
CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
159
txn_File->write(&(txn_DiskHeader.th_start_8),
160
offsetof(MSDiskTransHeadRec, th_start_8),
161
sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
167
// Write the recovered flag seperately just incase of a crash during the write operation.
168
CS_SET_DISK_1(txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);
169
txn_File->write(&(txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1 );
179
void MSTrans::txn_SetFile(CSFile *tr_file)
186
static FILE *txn_debug_log;
189
MSTrans *MSTrans::txn_NewMSTrans(const char *log_path, bool dump_log)
191
MSTrans *trans = NULL;
198
new_(trans, MSTrans());
201
path = CSPath::newPath(log_path);
207
if (!path->exists()) { // Create the transaction log.
208
CSFile *tr_file = path->createFile(CSFile::CREATE);
211
log_size = DFLT_TRANS_LOG_LIST_SIZE * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
213
// Preallocate the log space and initialize it.
214
MSDiskTransRec recs[1024] = {};
215
off64_t offset = sizeof(MSDiskTransHeadRec);
216
uint64_t num_records = DFLT_TRANS_LOG_LIST_SIZE;
219
while (num_records) {
220
if (num_records < 1024)
224
tr_file->write(recs, offset, size * sizeof(MSDiskTransRec));
225
offset += size * sizeof(MSDiskTransRec);
229
trans->txn_MaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
230
trans->txn_ReqestedMaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
231
trans->txn_MaxCheckPoint = DFLT_TRANS_CHECKPOINT_THRESHOLD;
232
trans->txn_MaxTID = 1;
234
// Initialize the log header.
235
CS_SET_DISK_4(trans->txn_DiskHeader.th_magic_4, MS_TRANS_LOG_MAGIC);
236
CS_SET_DISK_2(trans->txn_DiskHeader.th_version_2, MS_TRANS_LOG_VERSION);
238
CS_SET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4, trans->txn_MaxTID);
240
CS_SET_DISK_2(trans->txn_DiskHeader.th_check_point_2, trans->txn_MaxCheckPoint);
242
CS_SET_DISK_8(trans->txn_DiskHeader.th_list_size_8, trans->txn_MaxRecords);
243
CS_SET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8, trans->txn_ReqestedMaxRecords);
245
CS_SET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4, DFLT_TRANS_CACHE_SIZE);
247
CS_SET_DISK_8(trans->txn_DiskHeader.th_start_8, 0);
248
CS_SET_DISK_8(trans->txn_DiskHeader.th_eol_8, 0);
250
CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);
251
CS_SET_DISK_1(trans->txn_DiskHeader.th_checksum_1, 1);
252
CS_SET_DISK_1(trans->txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
254
tr_file->write(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec));
256
trans->txn_SetFile(tr_file);
258
trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
260
trans->txn_TransCache = MSTransCache::newMSTransCache(DFLT_TRANS_CACHE_SIZE);
261
} else { // The transaction log already exists
262
bool overflow = false, recovered = false;
264
CSFile *tr_file = path->createFile(CSFile::DEFAULT); // Open read/write
267
// Read the log header:
268
if (tr_file->read(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec), 0) < sizeof(MSDiskTransHeadRec)) {
274
// check the log header:
275
if (CS_GET_DISK_4(trans->txn_DiskHeader.th_magic_4) != MS_TRANS_LOG_MAGIC)
276
CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_HEADER_MAGIC);
278
if (CS_GET_DISK_2(trans->txn_DiskHeader.th_version_2) != MS_TRANS_LOG_VERSION)
279
CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_VERSION_TOO_NEW);
282
if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_NO_OVERFLOW)
284
else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_OVERFLOW)
287
CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
290
if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_NOT_RECOVERED)
292
else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_RECOVERED)
295
CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
297
// Check that the log is the expected size.
298
log_size = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8) * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
300
if ((log_size > tr_file->getEOF()) ||
301
((log_size < tr_file->getEOF()) && !overflow)){
303
char buffer[CS_EXC_MESSAGE_SIZE];
304
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unexpected transaction log size: ");
305
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, path->getCString());
306
CSException::throwException(CS_CONTEXT, CS_ERR_BAD_FILE_HEADER, buffer);
309
trans->txn_MaxTID = CS_GET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4);
311
// Looks good, we will assume it is a valid log file.
312
trans->txn_TransCache = MSTransCache::newMSTransCache(CS_GET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4));
315
trans->txn_SetFile(tr_file);
317
trans->txn_MaxCheckPoint = CS_GET_DISK_2(trans->txn_DiskHeader.th_check_point_2);
319
trans->txn_MaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8);
320
trans->txn_ReqestedMaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8);
322
trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
323
trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
324
trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
325
trans->txn_HaveOverflow = overflow;
327
trans->txn_Overflow = (tr_file->getEOF() - sizeof(MSDiskTransHeadRec)) /sizeof(MSDiskTransRec);
329
trans->txn_Overflow = 0;
333
printf("Recovering overflow log\n");
336
snprintf(name, 100, "%dms-trans-log.dump", (int)time(NULL));
337
trans->txn_DumpLog(name);
340
// Recover the log if required.
342
trans->txn_Recover();
347
trans->txn_Recovered = true; // Any recovery required has been completed.
349
// The log has been recovered so these values should be valid:
350
trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
351
trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
353
// Set the header to indicate that the log has not been closed properly.
354
// This is reset when the log is closed during shutdown.
355
CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_NOT_RECOVERED);
356
trans->txn_File->write(&(trans->txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1);
358
// Load the transaction records into memory.
359
trans->txn_TransCache->tc_StartCacheReload(true);
360
trans->txn_LoadTransactionCache(trans->txn_Start);
361
trans->txn_TransCache->tc_CompleteCacheReload();
363
if (trans->txn_MaxRecords != trans->txn_ReqestedMaxRecords)
364
trans->txn_ResizeLog(); // Try to resize but it may not be possible yet.
371
txn_debug_log = fopen("log_dump.txt", "w+");
372
if (!txn_debug_log) {
373
perror("log_dump.txt");
380
bool MSTrans::txn_ValidRecord(MSTransPtr rec)
382
uint8_t check = rec->tr_check;
385
rec->tr_check = txn_Checksum;
386
ok = (checksum((uint8_t*)rec, sizeof(MSTransRec)) == check);
387
rec->tr_check = check;
391
void MSTrans::txn_GetRecordAt(uint64_t index, MSTransPtr rec)
396
// Read 1 record from the log and convert it from disk format.
397
offset = sizeof(MSDiskTransHeadRec) + index * sizeof(MSDiskTransRec);
398
txn_File->read(&drec, offset, sizeof(MSDiskTransRec), sizeof(MSDiskTransRec));
399
GET_DISK_TRANSREC(rec, &drec);
402
// Recovery involves finding the start of the first record and the eof
403
// position. The positions will be found at or after the position stored
405
void MSTrans::txn_Recover()
407
MSTransRec rec = {0,0,0,0,0,0,0};
408
uint64_t original_eol = txn_EOL;
412
printf("Recovering transaction log!\n");
416
// Search for the last valid record in the log starting from the last
417
// known position stored in the header.
418
for (; txn_EOL < txn_MaxRecords; txn_EOL++) {
419
txn_GetRecordAt(txn_EOL, &rec);
420
if (! txn_ValidRecord(&rec))
424
if (txn_EOL == txn_MaxRecords) {
425
// It looks like all the records in the log are valid?
426
// This is strange but could happen if the crash
427
// occurred just before updating the header as the
428
// eol position rolled over to the top of the log.
434
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
436
// If the actual eol has moved pass the recorded start position
437
// then the actuall start position must be some where beyond
439
if (((original_eol < txn_Start) || (original_eol > txn_EOL)) && (txn_EOL >= txn_Start))
440
txn_Start = txn_EOL +1;
442
// Position the start at the beginning of a transaction.
443
uint64_t end_search = (txn_Start < txn_EOL)? txn_EOL : txn_MaxRecords;
444
for (; txn_Start < end_search; txn_Start++) {
445
txn_GetRecordAt(txn_Start, &rec);
446
if (TRANS_IS_START(rec.tr_type))
450
if (txn_Start == end_search)
453
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
455
txn_TransCache->tc_SetRecovering(true);
456
// Load the transaction records into the cache.
457
txn_TransCache->tc_StartCacheReload(true);
458
txn_LoadTransactionCache(txn_Start);
459
txn_TransCache->tc_CompleteCacheReload();
461
// Now go through all the transactions and add rollbacks for any
462
// unterminated transactions.
465
while (txn_TransCache->tc_GetTransaction(&ref, &terminated)) {
467
txn_MaxTID = txn_TransCache->tc_GetTransactionID(ref); // Save the TID of the last transaction.
469
self->myTID = txn_MaxTID;
470
self->myTransRef = ref;
471
self->myStartTxn = false;
472
txn_AddTransaction(MS_RecoveredTxn);
475
txn_TransCache->tc_FreeTransaction(ref);
477
// Load the next block of transactions into the cache.
478
// This needs to be done after each tc_GetTransaction() to make sure
479
// that if the transaction terminator is some where in the log
480
// it will get read even if the cache is completely full.
481
if (txn_TransCache->tc_ShoulReloadCache()) {
482
txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload(true));
483
txn_TransCache->tc_CompleteCacheReload();
488
txn_TransCache->tc_SetRecovering(false);
489
self->myTransRef = 0;
491
// Update the header again incase rollbacks have been added.
492
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
497
bool ReadTXNLog::rl_CanContinue()
499
return rl_log->txn_TransCache->tc_ContinueCacheReload();
502
void ReadTXNLog::rl_Load(uint64_t log_position, MSTransPtr rec)
504
rl_log->txn_TransCache->tc_AddRec(log_position, rec);
507
void ReadTXNLog::rl_Store(uint64_t log_position, MSTransPtr rec)
510
SET_DISK_TRANSREC(&drec, rec);
512
rl_log->txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + log_position * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec));
515
void ReadTXNLog::rl_Flush()
517
rl_log->txn_File->flush();
518
rl_log->txn_File->sync();
521
void ReadTXNLog::rl_ReadLog(uint64_t read_start, bool log_locked)
523
uint64_t size, orig_size;
524
bool reading_overflow = (read_start >= rl_log->txn_MaxRecords);
528
// Get the number of transaction records to be loaded.
529
if (reading_overflow) {
530
orig_size = rl_log->txn_Overflow;
531
size = rl_log->txn_Overflow - read_start;
533
orig_size = rl_log->txn_GetNumRecords();
535
if (rl_log->txn_Start <= read_start)
536
size = orig_size - (read_start - rl_log->txn_Start);
538
size = rl_log->txn_EOL - read_start;
541
// load all the records
542
while (size && rl_CanContinue()) {
543
MSDiskTransRec diskRecords[1000];
552
// Check if we have reached the wrap around point in the log.
553
if ((!reading_overflow) && (rl_log->txn_EOL < read_start) && ((rl_log->txn_MaxRecords - read_start) < read_size))
554
read_size = rl_log->txn_MaxRecords - read_start ;
556
// Read the next block of records.
557
offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);
558
rl_log->txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
560
// Convert the records from disk format and add them to the cache.
561
for (uint32_t i = 0; i < read_size && rl_CanContinue(); i++) {
563
MSDiskTransPtr drec = diskRecords + i;
564
GET_DISK_TRANSREC(&rec, drec);
566
rl_Load(read_start + i, &rec);
570
read_start += read_size;
571
if (read_start == rl_log->txn_MaxRecords)
575
if (rl_log->txn_HaveOverflow && !reading_overflow) {
576
if (rl_CanContinue())
577
rl_ReadLog(rl_log->txn_MaxRecords, false);
579
} else if (!log_locked) {
580
// The following is intended to prevent the case where a writer
581
// writes an txn record while the cache is full but just after
582
// the reload has completed. If the cache is not yet full we need
583
// to load as many of the new records into cache as possible.
587
if (reading_overflow)
588
new_size = rl_log->txn_Overflow;
590
new_size = rl_log->txn_GetNumRecords();
591
if (rl_CanContinue() && (orig_size != new_size)) {
592
rl_ReadLog(read_start, true);
601
void MSTrans::txn_LoadTransactionCache(uint64_t read_start)
603
ReadTXNLog log(this);
605
log.rl_ReadLog(read_start, false);
606
txn_TransCache->tc_UpdateCacheVersion(); // Signal writes to recheck cache for overflow txn refs.
610
void MSTrans::txn_ResizeLog()
615
if (TRANS_CAN_RESIZE) {
616
// TRANS_CAN_RESIZE checks that there is no overflow and the the start position
617
// is less than eol. This implies the from eol to the end of file doesn't contain
622
uint64_t old_size = txn_MaxRecords;
624
if (txn_MaxRecords > txn_ReqestedMaxRecords) { // Shrink the log
625
uint64_t max_resize = txn_MaxRecords - txn_EOL;
627
if ( txn_Start == txn_EOL)
628
max_resize = txn_MaxRecords;
630
max_resize = txn_MaxRecords - txn_EOL;
631
if (!txn_Start) // If start is at '0' then the EOL cannot be wrapped.
636
if (max_resize > (txn_MaxRecords - txn_ReqestedMaxRecords))
637
max_resize = txn_MaxRecords - txn_ReqestedMaxRecords;
639
txn_MaxRecords -= max_resize;
641
txn_MaxRecords = txn_ReqestedMaxRecords; // Grow the log
644
char buffer[CS_EXC_MESSAGE_SIZE];
645
snprintf(buffer, CS_EXC_MESSAGE_SIZE, "Resizing the Transaction log from %"PRIu64" to %"PRIu64" \n", old_size, txn_MaxRecords);
646
CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, buffer);
649
CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
651
txn_File->setEOF(txn_MaxRecords * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec));
652
txn_File->write(&(txn_DiskHeader.th_list_size_8), offsetof(MSDiskTransHeadRec, th_list_size_8), 8);
654
if (txn_Start == txn_EOL) {
657
} else if (txn_MaxRecords == txn_EOL) {
669
void MSTrans::txn_ResetEOL()
673
txn_EOLCheckPoint = txn_MaxCheckPoint;
674
txn_StartCheckPoint = txn_MaxCheckPoint;
678
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
679
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
680
CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
681
txn_File->write(&(txn_DiskHeader.th_start_8),
682
offsetof(MSDiskTransHeadRec, th_start_8),
683
sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
692
#define PRINT_TRANS(tid, a, t)
695
#define PRINT_TRANS(tid, a, t) printTrans(tid, a, t)
696
static void printTrans(uint32_t tid, bool autocommit, MS_Txn type)
698
const char *type_name = "???";
702
type_name = "Rollback";
704
case MS_PartialRollBackTxn:
705
type_name = "PartialRollBack";
708
type_name = "Commit";
710
case MS_ReferenceTxn:
711
type_name = "Reference";
713
case MS_DereferenceTxn:
714
type_name = "Dereference";
716
case MS_RecoveredTxn:
717
type_name = "Recovered";
721
fprintf(stderr, "MSTrans::txn_LogTransaction(%d, autocommit = %s, %s)\n", tid, (autocommit)?"On":"Off", type_name);
726
void MSTrans::txn_LogTransaction(MS_Txn type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
734
case MS_PartialRollBackTxn:
739
case MS_ReferenceTxn:
740
case MS_DereferenceTxn:
741
case MS_RecoveredTxn:
745
self->myTID = txn_MaxTID;
746
self->myTransRef = TRANS_CACHE_NEW_REF;
747
self->myStartTxn = true;
750
PRINT_TRANS(self->myTID, autocommit, type);
752
txn_AddTransaction(type, autocommit, db_id, tab_id, blob_id, blob_ref_id);
753
if (autocommit || TRANS_TYPE_IS_TERMINATED(type))
754
txn_NewTransaction();
761
void MSTrans::txn_AddTransaction(uint8_t tran_type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
763
MSTransRec rec = {0,0,0,0,0,0,0}; // This must be set to zero so that the checksum will be valid.
765
uint64_t new_offset = txn_EOL;
766
bool do_flush = true;
771
// Check that the log is not already full.
773
if (!txn_HaveOverflow) { // The first overflow record: update the header.
774
CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_OVERFLOW);
775
txn_File->write(&(txn_DiskHeader.th_overflow_1), offsetof(MSDiskTransHeadRec, th_overflow_1), 1);
777
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
778
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
779
txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
783
txn_HaveOverflow = true;
785
txn_Overflow = txn_MaxRecords;
788
new_offset = txn_Overflow;
791
rec.tr_id = self->myTID ;
792
rec.tr_type = tran_type;
793
rec.tr_db_id = db_id;
794
rec.tr_tab_id = tab_id;
795
rec.tr_blob_id = blob_id;
796
rec.tr_blob_ref_id = blob_ref_id;
798
if (self->myStartTxn) {
799
TRANS_SET_START(rec.tr_type);
800
self->myStartTxn = false;
804
TRANS_SET_AUTOCOMMIT(rec.tr_type);
810
switch (TRANS_TYPE(rec.tr_type)) {
811
case MS_ReferenceTxn:
814
case MS_DereferenceTxn:
819
rec.tr_blob_ref_id = 0;
821
case MS_RecoveredTxn:
823
rec.tr_blob_ref_id = 0;
829
if (TRANS_IS_TERMINATED(rec.tr_type))
834
fprintf(txn_debug_log, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" %"PRIu64" %"PRIu64" %"PRIu64" %d\n", self->myTID, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id, txn_Start, txn_EOL, new_offset, txn_HaveOverflow);
838
rec.tr_check = txn_Checksum;
840
// Calculate the records checksum.
841
rec.tr_check = checksum((uint8_t*)&rec, sizeof(rec));
843
// Write the record to disk.
844
SET_DISK_TRANSREC(&drec, &rec);
847
if (trans_test_crash_point == 9) { // do a partial write before crashing
848
txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec)/2 );
851
txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
853
txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
856
// There is no need to sync if the transaction is still running.
857
if (TRANS_IS_TERMINATED(tran_type)) {
858
CRASH_POINT(4); // This crash will result in a verify error because the txn was committed to the log but not the database.
864
if (!txn_HaveOverflow) { // No need to update the header if overflowing.
865
uint64_t rec_offset = txn_EOL;
867
txn_EOL = new_offset;
870
if (txn_EOL == txn_MaxRecords) {
871
// The eol has rolled over.
876
if ((!txn_EOLCheckPoint) || !txn_EOL) {
878
// Flush the previouse write if required before updating the header.
879
// This is just in case it crashes during the sync to make sure that the
880
// header information is correct for the data on disk. If the crash occurred
881
// between writing the header and the record the header on disk would be wrong.
890
txn_TransCache->tc_AddRec(rec_offset, &rec, self->myTransRef);
892
if (txn_GetNumRecords() > txn_HighWaterMark)
893
txn_HighWaterMark = txn_GetNumRecords();
895
} else { // Ovewrflow
896
txn_TransCache->tc_AddRec(txn_Overflow, &rec, self->myTransRef);
898
if (txn_Overflow > txn_HighWaterMark)
899
txn_HighWaterMark = txn_Overflow;
902
ASSERT(txn_EOL < txn_MaxRecords);
903
ASSERT(txn_Start < txn_MaxRecords);
907
uint64_t MSTrans::txn_GetSize()
909
return sizeof(MSDiskTransHeadRec) + txn_MaxRecords * sizeof(MSDiskTransRec);
913
void MSTrans::txn_NewTransaction()
917
self->myTID = 0; // This will be assigned when the first record is written.
923
void MSTrans::txn_PerformIdleTasks()
927
if (txn_TransCache->tc_ShoulReloadCache()) {
928
txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload());
929
txn_TransCache->tc_CompleteCacheReload();
933
// During backup the reader is suspended. This may need to be changed
934
// if we decide to actually do something here.
935
txn_reader->suspendedWait(1000);
940
void MSTrans::txn_ResetReadPosition(uint64_t pos)
942
bool rollover = (pos < txn_Start);
945
if (pos >= txn_MaxRecords) { // Start of overflow
948
// Overflow has occurred and the circular list is now empty
949
// so expand the list to include the overflow and
950
// reset txn_Start and txn_EOL
951
txn_Start = txn_MaxRecords;
952
txn_MaxRecords = txn_Overflow;
954
txn_HaveOverflow = false;
957
CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
958
CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
959
txn_File->write(&(txn_DiskHeader.th_overflow_1), offsetof(MSDiskTransHeadRec, th_overflow_1), 1);
960
txn_File->write(&(txn_DiskHeader.th_list_size_8), offsetof(MSDiskTransHeadRec, th_list_size_8), 8);
968
ASSERT(txn_Start <= txn_MaxRecords);
971
txn_StartCheckPoint -= (pos - txn_Start);
973
// Flush the header if the read position has rolled over or it is time.
974
if ( rollover || (txn_StartCheckPoint <=0)) {
976
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
977
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
978
txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
982
txn_StartCheckPoint = txn_MaxCheckPoint;
988
if (TRANS_CAN_RESIZE)
994
bool MSTrans::txn_haveNextTransaction()
996
bool terminated = false;
999
txn_TransCache->tc_GetTransaction(&ref, &terminated);
1005
void MSTrans::txn_GetNextTransaction(MSTransPtr tran, MS_TxnState *state)
1008
uint64_t log_position;
1011
ASSERT(txn_reader == self);
1015
// Get the next completed transaction.
1016
// this will suspend the current thread, which is assumed
1017
// to be the log reader, until one is available.
1018
while ((!txn_IsTxnValid) && !self->myMustQuit) {
1020
// wait until backup has completed.
1021
while (txn_Doingbackup && !self->myMustQuit)
1022
txn_PerformIdleTasks();
1024
if (txn_TransCache->tc_GetTransaction(&txn_CurrentTxn, &terminated) && terminated) {
1025
txn_IsTxnValid = true;
1028
txn_PerformIdleTasks();
1031
if (self->myMustQuit)
1034
if (txn_TransCache->tc_GetRecAt(txn_CurrentTxn, txn_TxnIndex++, tran, state))
1038
txn_TransCache->tc_FreeTransaction(txn_CurrentTxn);
1040
if (txn_TransCache->tc_GetTransactionStartPosition(&log_position)) {
1041
txn_ResetReadPosition(log_position);
1043
if (txn_TransCache->tc_ShoulReloadCache()) {
1044
uint64_t pos = txn_TransCache->tc_StartCacheReload();
1045
txn_ResetReadPosition(pos);
1046
txn_LoadTransactionCache(pos);
1047
txn_TransCache->tc_CompleteCacheReload();
1049
// Lock the object to prevent writer thread updates while I check again.
1050
// This is to ensure that txn_EOL is not changed between the call to
1051
// tc_GetTransactionStartPosition() and setting the read position.
1053
if (txn_TransCache->tc_GetTransactionStartPosition(&log_position))
1054
txn_ResetReadPosition(log_position);
1056
txn_ResetReadPosition(txn_EOL);
1061
txn_IsTxnValid = false;
1065
unlock_(txn_reader);
1070
void MSTrans::txn_GetStats(MSTransStatsPtr stats)
1073
if (txn_HaveOverflow) {
1074
stats->ts_IsOverflowing = true;
1075
stats->ts_LogSize = txn_Overflow;
1077
stats->ts_IsOverflowing = false;
1078
stats->ts_LogSize = txn_GetNumRecords();
1080
stats->ts_PercentFull = (stats->ts_LogSize * 100) / CS_GET_DISK_8(txn_DiskHeader.th_requested_list_size_8);
1082
stats->ts_MaxSize = txn_HighWaterMark;
1083
stats->ts_OverflowCount = txn_OverflowCount;
1085
stats->ts_TransCacheSize = txn_TransCache->tc_GetCacheUsed();
1086
stats->ts_PercentTransCacheUsed = txn_TransCache->tc_GetPercentCacheUsed();
1087
stats->ts_PercentCacheHit = txn_TransCache->tc_GetPercentCacheHit();
1090
void MSTrans::txn_SetCacheSize(uint32_t new_size)
1093
// Important lock order. Writer threads never lock the reader but the reader
1094
// may lock this object so always lock the reader first.
1098
CS_SET_DISK_4(txn_DiskHeader.th_requested_cache_size_4, new_size);
1100
txn_File->write(&(txn_DiskHeader.th_requested_cache_size_4), offsetof(MSDiskTransHeadRec, th_requested_cache_size_4), 4);
1104
txn_TransCache->tc_SetSize(new_size);
1107
unlock_(txn_reader);
1111
void MSTrans::txn_SetLogSize(uint64_t new_size)
1115
// Important lock order. Writer threads never lock the reader but the reader
1116
// may lock this object so always lock the reader first.
1120
txn_ReqestedMaxRecords = (new_size - sizeof(MSDiskTransHeadRec)) / sizeof(MSDiskTransRec);
1122
if (txn_ReqestedMaxRecords < 10)
1123
txn_ReqestedMaxRecords = 10;
1125
CS_SET_DISK_8(txn_DiskHeader.th_requested_list_size_8, txn_ReqestedMaxRecords);
1127
txn_File->write(&(txn_DiskHeader.th_requested_list_size_8), offsetof(MSDiskTransHeadRec, th_requested_list_size_8), 8);
1132
unlock_(txn_reader);
1137
// A helper class for resetting database IDs in the transaction log.
1138
class DBSearchTXNLog : ReadTXNLog {
1140
DBSearchTXNLog(MSTrans *log): ReadTXNLog(log), sdb_db_id(0), sdb_isDirty(false) {}
1145
virtual bool rl_CanContinue() { return true;}
1146
virtual void rl_Load(uint64_t log_position, MSTransPtr rec)
1148
if (rec->tr_db_id == sdb_db_id) {
1151
rl_Store(log_position, rec);
1155
void SetDataBaseIDToZero(uint32_t db_id)
1158
rl_ReadLog(rl_log->txn_GetStartPosition(), false);
1164
// Dropping the database from the transaction log just involves
1165
// scanning the log and setting the database id of any transactions
1166
// involving the dropped database to zero.
1167
void MSTrans::txn_dropDatabase(uint32_t db_id)
1171
// Important lock order. Writer threads never lock the reader but the reader
1172
// may lock this object so always lock the reader first.
1176
// Clear any transaction records in the cache for the dropped database;
1177
txn_TransCache->tc_dropDatabase(db_id);
1179
// Scan the log setting the database ID for any record belonging to the
1180
// dropped database to zero.
1181
DBSearchTXNLog searchLog(this);
1183
searchLog.SetDataBaseIDToZero(db_id);
1186
unlock_(txn_reader);
1191
void MSTrans::txn_DumpLog(const char *file)
1193
size_t size, read_start = 0;
1197
fptr = fopen(file, "w+");
1204
size = txn_Overflow;
1206
size = txn_MaxRecords;
1208
// Dump all the records
1210
MSDiskTransRec diskRecords[1000];
1219
// Read the next block of records.
1220
offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);
1221
txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
1223
for (uint32_t i = 0; i < read_size; i++) {
1224
const char *ttype, *cmt;
1226
MSDiskTransPtr drec = diskRecords + i;
1227
GET_DISK_TRANSREC(&rec, drec);
1229
switch (TRANS_TYPE(rec.tr_type)) {
1230
case MS_ReferenceTxn:
1233
case MS_DereferenceTxn:
1236
case MS_RollBackTxn:
1238
rec.tr_blob_ref_id = 0;
1240
case MS_RecoveredTxn:
1242
rec.tr_blob_ref_id = 0;
1248
if (TRANS_IS_TERMINATED(rec.tr_type))
1254
fprintf(fptr, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" \t %s %s %s\n", rec.tr_id, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id,
1255
((read_start + i) == txn_Start) ? "START":"",
1256
((read_start + i) == txn_EOL) ? "EOL":"",
1257
((read_start + i) == txn_MaxRecords) ? "OverFlow":""
1262
read_start += read_size;