1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25
* PBMS transaction handling.
27
* PBMS uses 1 circular transaction log. All BLOB reference operations are written to this log
28
* and are applied to the repository when committed. There is 1 thread dedicated to reading the
29
* transaction log and applying the changes. During an engine level backup this thread is suspended
30
* so that no transactions will be applied to the repository files as they are backed up.
39
#include "CSStrUtil.h"
40
#include "CSStorage.h"
42
#include "TransLog_ms.h"
43
#include "TransCache_ms.h"
46
uint32_t trans_test_crash_point;
47
#define CRASH_POINT(p) { if (p == trans_test_crash_point) { char *ptr = NULL; printf("Crash on demand at: %s(%d), start: %"PRIu64", eol: %"PRIu64"\n", __FILE__, __LINE__, txn_Start, txn_EOL); *ptr = 88;}}
49
#define CRASH_POINT(p)
52
#define MS_TRANS_LOG_MAGIC 0xA6E7D7B3
53
#define MS_TRANS_LOG_VERSION 1
54
#define MS_TRANS_LOG_RECOVERED 0XA1
55
#define MS_TRANS_LOG_NOT_RECOVERED 0XA2
56
#define MS_TRANS_NO_OVERFLOW 0XB1
57
#define MS_TRANS_OVERFLOW 0XB2
59
#define DFLT_TRANS_CHECKPOINT_THRESHOLD 1024
61
#define DFLT_TRANS_LOG_LIST_SIZE (1024 * 10)
62
#define DFLT_TRANS_CACHE_SIZE (500)
64
#define TRANS_CAN_RESIZE ((txn_MaxRecords != txn_ReqestedMaxRecords) && (txn_EOL >= txn_Start) && !txn_HaveOverflow)
66
typedef struct MSDiskTrans {
67
CSDiskValue4 dtr_id_4; // The transaction ID
68
CSDiskValue1 dtr_type_1; // The transaction type. If the first bit is set then the transaction is an autocommit.
69
CSDiskValue1 dtr_check_1; // The trransaction record checksum.
70
CSDiskValue4 dtr_db_id_4; // The database ID for the operation.
71
CSDiskValue4 dtr_tab_id_4; // The table ID for the operation.
72
CSDiskValue8 dtr_blob_id_8; // The blob ID for the operation.
73
CSDiskValue8 dtr_blob_ref_id_8; // The blob reference id.
74
} MSDiskTransRec, *MSDiskTransPtr;
76
#define SET_DISK_TRANSREC(d, s) { \
77
CS_SET_DISK_4((d)->dtr_id_4, (s)->tr_id);\
78
CS_SET_DISK_1((d)->dtr_type_1, (s)->tr_type);\
79
CS_SET_DISK_1((d)->dtr_check_1, (s)->tr_check);\
80
CS_SET_DISK_4((d)->dtr_db_id_4, (s)->tr_db_id);\
81
CS_SET_DISK_4((d)->dtr_tab_id_4, (s)->tr_tab_id);\
82
CS_SET_DISK_8((d)->dtr_blob_id_8, (s)->tr_blob_id);\
83
CS_SET_DISK_8((d)->dtr_blob_ref_id_8, (s)->tr_blob_ref_id);\
86
#define GET_DISK_TRANSREC(s, d) { \
87
(s)->tr_id = CS_GET_DISK_4((d)->dtr_id_4);\
88
(s)->tr_type = CS_GET_DISK_1((d)->dtr_type_1);\
89
(s)->tr_check = CS_GET_DISK_1((d)->dtr_check_1);\
90
(s)->tr_db_id = CS_GET_DISK_4((d)->dtr_db_id_4);\
91
(s)->tr_tab_id = CS_GET_DISK_4((d)->dtr_tab_id_4);\
92
(s)->tr_blob_id = CS_GET_DISK_8((d)->dtr_blob_id_8);\
93
(s)->tr_blob_ref_id = CS_GET_DISK_8((d)->dtr_blob_ref_id_8);\
96
static uint8_t checksum(uint8_t *data, size_t len)
98
register uint32_t sum = 0, g;
101
chk = data + len - 1;
103
sum = (sum << 4) + *chk;
104
if ((g = sum & 0xF0000000)) {
105
sum = sum ^ (g >> 24);
110
return (uint8_t) (sum ^ (sum >> 24) ^ (sum >> 16) ^ (sum >> 8));
115
txn_TransCache(NULL),
116
txn_IsTxnValid(false),
117
txn_HaveOverflow(false),
118
txn_OverflowCount(0),
119
txn_HighWaterMark(0),
122
txn_StartCheckPoint(0),
123
txn_EOLCheckPoint(0),
126
txn_Recovered(false),
127
txn_Doingbackup(false)
135
txn_TransCache->release();
139
void MSTrans::txn_Close()
143
// Set the header to indicate that the log has not been closed properly.
144
CS_SET_DISK_4(txn_DiskHeader.th_next_txn_id_4, txn_MaxTID);
145
txn_File->write(&(txn_DiskHeader.th_next_txn_id_4), offsetof(MSDiskTransHeadRec, th_next_txn_id_4), 4 );
147
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
148
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
149
CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
150
txn_File->write(&(txn_DiskHeader.th_start_8),
151
offsetof(MSDiskTransHeadRec, th_start_8),
152
sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
158
// Write the recovered flag seperately just incase of a crash during the write operation.
159
CS_SET_DISK_1(txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);
160
txn_File->write(&(txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1 );
170
void MSTrans::txn_SetFile(CSFile *tr_file)
177
static FILE *txn_debug_log;
180
MSTrans *MSTrans::txn_NewMSTrans(const char *log_path, bool dump_log)
182
MSTrans *trans = NULL;
187
new_(trans, MSTrans());
190
path = CSPath::newPath(log_path);
196
if (!path->exists()) { // Create the transaction log.
197
CSFile *tr_file = path->createFile(CSFile::CREATE);
200
log_size = DFLT_TRANS_LOG_LIST_SIZE * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
202
// Preallocate the log space and initialize it.
203
MSDiskTransRec recs[1024] = {0};
204
off_t offset = sizeof(MSDiskTransHeadRec);
205
uint64_t num_records = DFLT_TRANS_LOG_LIST_SIZE;
208
while (num_records) {
209
if (num_records < 1024)
213
tr_file->write(recs, offset, size * sizeof(MSDiskTransRec));
214
offset += size * sizeof(MSDiskTransRec);
218
trans->txn_MaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
219
trans->txn_ReqestedMaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
220
trans->txn_MaxCheckPoint = DFLT_TRANS_CHECKPOINT_THRESHOLD;
221
trans->txn_MaxTID = 1;
223
// Initialize the log header.
224
CS_SET_DISK_4(trans->txn_DiskHeader.th_magic_4, MS_TRANS_LOG_MAGIC);
225
CS_SET_DISK_2(trans->txn_DiskHeader.th_version_2, MS_TRANS_LOG_VERSION);
227
CS_SET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4, trans->txn_MaxTID);
229
CS_SET_DISK_2(trans->txn_DiskHeader.th_check_point_2, trans->txn_MaxCheckPoint);
231
CS_SET_DISK_8(trans->txn_DiskHeader.th_list_size_8, trans->txn_MaxRecords);
232
CS_SET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8, trans->txn_ReqestedMaxRecords);
234
CS_SET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4, DFLT_TRANS_CACHE_SIZE);
236
CS_SET_DISK_8(trans->txn_DiskHeader.th_start_8, 0);
237
CS_SET_DISK_8(trans->txn_DiskHeader.th_eol_8, 0);
239
CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);
240
CS_SET_DISK_1(trans->txn_DiskHeader.th_checksum_1, 1);
241
CS_SET_DISK_1(trans->txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
243
tr_file->write(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec));
245
trans->txn_SetFile(tr_file);
247
trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
249
trans->txn_TransCache = MSTransCache::newMSTransCache(DFLT_TRANS_CACHE_SIZE);
250
} else { // The transaction log already exists
251
bool overflow, recovered;
253
CSFile *tr_file = path->createFile(CSFile::DEFAULT); // Open read/write
256
// Read the log header:
257
if (tr_file->read(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec), 0) < sizeof(MSDiskTransHeadRec)) {
263
// check the log header:
264
if (CS_GET_DISK_4(trans->txn_DiskHeader.th_magic_4) != MS_TRANS_LOG_MAGIC)
265
CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_HEADER_MAGIC);
267
if (CS_GET_DISK_2(trans->txn_DiskHeader.th_version_2) != MS_TRANS_LOG_VERSION)
268
CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_VERSION_TOO_NEW);
271
if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_NO_OVERFLOW)
273
else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_OVERFLOW)
276
CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
279
if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_NOT_RECOVERED)
281
else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_RECOVERED)
284
CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
286
// Check that the log is the expected size.
287
log_size = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8) * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
289
if ((log_size > tr_file->getEOF()) ||
290
((log_size < tr_file->getEOF()) && !overflow)){
292
char buffer[CS_EXC_MESSAGE_SIZE];
293
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unexpected transaction log size: ");
294
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, path->getCString());
295
CSException::throwException(CS_CONTEXT, CS_ERR_BAD_FILE_HEADER, buffer);
298
trans->txn_MaxTID = CS_GET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4);
300
// Looks good, we will assume it is a valid log file.
301
trans->txn_TransCache = MSTransCache::newMSTransCache(CS_GET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4));
304
trans->txn_SetFile(tr_file);
306
trans->txn_MaxCheckPoint = CS_GET_DISK_2(trans->txn_DiskHeader.th_check_point_2);
308
trans->txn_MaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8);
309
trans->txn_ReqestedMaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8);
311
trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
312
trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
313
trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
314
trans->txn_HaveOverflow = overflow;
316
trans->txn_Overflow = (tr_file->getEOF() - sizeof(MSDiskTransHeadRec)) /sizeof(MSDiskTransRec);
318
trans->txn_Overflow = 0;
322
printf("Recovering overflow log\n");
325
snprintf(name, 100, "%dms-trans-log.dump", time(NULL));
326
trans->txn_DumpLog(name);
329
// Recover the log if required.
331
trans->txn_Recover();
336
trans->txn_Recovered = true; // Any recovery required has been completed.
338
// The log has been recovered so these values should be valid:
339
trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
340
trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
342
// Set the header to indicate that the log has not been closed properly.
343
// This is reset when the log is closed during shutdown.
344
CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_NOT_RECOVERED);
345
trans->txn_File->write(&(trans->txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1);
347
// Load the transaction records into memory.
348
trans->txn_TransCache->tc_StartCacheReload(true);
349
trans->txn_LoadTransactionCache(trans->txn_Start);
350
trans->txn_TransCache->tc_CompleteCacheReload();
352
if (trans->txn_MaxRecords != trans->txn_ReqestedMaxRecords)
353
trans->txn_ResizeLog(); // Try to resize but it may not be possible yet.
360
txn_debug_log = fopen("log_dump.txt", "w+");
361
if (!txn_debug_log) {
362
perror("log_dump.txt");
369
bool MSTrans::txn_ValidRecord(MSTransPtr rec)
371
uint8_t check = rec->tr_check;
374
rec->tr_check = txn_Checksum;
375
ok = (checksum((uint8_t*)rec, sizeof(MSTransRec)) == check);
376
rec->tr_check = check;
380
void MSTrans::txn_GetRecordAt(uint64_t index, MSTransPtr rec)
385
// Read 1 record from the log and convert it from disk format.
386
offset = sizeof(MSDiskTransHeadRec) + index * sizeof(MSDiskTransRec);
387
txn_File->read(&drec, offset, sizeof(MSDiskTransRec), sizeof(MSDiskTransRec));
388
GET_DISK_TRANSREC(rec, &drec);
391
// Recovery involves finding the start of the first record and the eof
392
// position. The positions will be found at or after the position stored
394
void MSTrans::txn_Recover()
396
MSTransRec rec = {0};
397
uint64_t original_eol = txn_EOL;
401
printf("Recovering transaction log!\n");
405
// Search for the last valid record in the log starting from the last
406
// known position stored in the header.
407
for (; txn_EOL < txn_MaxRecords; txn_EOL++) {
408
txn_GetRecordAt(txn_EOL, &rec);
409
if (! txn_ValidRecord(&rec))
413
if (txn_EOL == txn_MaxRecords) {
414
// It looks like all the records in the log are valid?
415
// This is strange but could happen if the crash
416
// occurred just before updating the header as the
417
// eol position rolled over to the top of the log.
423
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
425
// If the actual eol has moved pass the recorded start position
426
// then the actuall start position must be some where beyond
428
if (((original_eol < txn_Start) || (original_eol > txn_EOL)) && (txn_EOL >= txn_Start))
429
txn_Start = txn_EOL +1;
431
// Position the start at the beginning of a transaction.
432
uint64_t end_search = (txn_Start < txn_EOL)? txn_EOL : txn_MaxRecords;
433
for (; txn_Start < end_search; txn_Start++) {
434
txn_GetRecordAt(txn_Start, &rec);
435
if (TRANS_IS_START(rec.tr_type))
439
if (txn_Start == end_search)
442
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
444
txn_TransCache->tc_SetRecovering(true);
445
// Load the transaction records into the cache.
446
txn_TransCache->tc_StartCacheReload(true);
447
txn_LoadTransactionCache(txn_Start);
448
txn_TransCache->tc_CompleteCacheReload();
450
// Now go through all the transactions and add rollbacks for any
451
// unterminated transactions.
454
while (txn_TransCache->tc_GetTransaction(&ref, &terminated)) {
456
txn_MaxTID = txn_TransCache->tc_GetTransactionID(ref); // Save the TID of the last transaction.
458
self->myTID = txn_MaxTID;
459
self->myTransRef = ref;
460
self->myStartTxn = false;
461
txn_AddTransaction(MS_RecoveredTxn);
464
txn_TransCache->tc_FreeTransaction(ref);
466
// Load the next block of transactions into the cache.
467
// This needs to be done after each tc_GetTransaction() to make sure
468
// that if the transaction terminator is some where in the log
469
// it will get read even if the cache is completely full.
470
if (txn_TransCache->tc_ShoulReloadCache()) {
471
txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload(true));
472
txn_TransCache->tc_CompleteCacheReload();
477
txn_TransCache->tc_SetRecovering(false);
478
self->myTransRef = 0;
480
// Update the header again incase rollbacks have been added.
481
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
486
bool ReadTXNLog::rl_CanContinue()
488
return rl_log->txn_TransCache->tc_ContinueCacheReload();
491
void ReadTXNLog::rl_Load(uint64_t log_position, MSTransPtr rec)
493
rl_log->txn_TransCache->tc_AddRec(log_position, rec);
496
void ReadTXNLog::rl_Store(uint64_t log_position, MSTransPtr rec)
499
SET_DISK_TRANSREC(&drec, rec);
501
rl_log->txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + log_position * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec));
504
void ReadTXNLog::rl_Flush()
506
rl_log->txn_File->flush();
507
rl_log->txn_File->sync();
510
void ReadTXNLog::rl_ReadLog(uint64_t read_start, bool log_locked)
512
uint64_t size, orig_size;
513
bool reading_overflow = (read_start >= rl_log->txn_MaxRecords);
517
// Get the number of transaction records to be loaded.
518
if (reading_overflow) {
519
orig_size = rl_log->txn_Overflow;
520
size = rl_log->txn_Overflow - read_start;
522
orig_size = rl_log->txn_GetNumRecords();
524
if (rl_log->txn_Start <= read_start)
525
size = orig_size - (read_start - rl_log->txn_Start);
527
size = rl_log->txn_EOL - read_start;
530
// load all the records
531
while (size && rl_CanContinue()) {
532
MSDiskTransRec diskRecords[1000];
541
// Check if we have reached the wrap around point in the log.
542
if ((!reading_overflow) && (rl_log->txn_EOL < read_start) && ((rl_log->txn_MaxRecords - read_start) < read_size))
543
read_size = rl_log->txn_MaxRecords - read_start ;
545
// Read the next block of records.
546
offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);
547
rl_log->txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
549
// Convert the records from disk format and add them to the cache.
550
for (uint32_t i = 0; i < read_size && rl_CanContinue(); i++) {
552
MSDiskTransPtr drec = diskRecords + i;
553
GET_DISK_TRANSREC(&rec, drec);
555
rl_Load(read_start + i, &rec);
559
read_start += read_size;
560
if (read_start == rl_log->txn_MaxRecords)
564
if (rl_log->txn_HaveOverflow && !reading_overflow) {
565
if (rl_CanContinue())
566
rl_ReadLog(rl_log->txn_MaxRecords, false);
568
} else if (!log_locked) {
569
// The following is intended to prevent the case where a writer
570
// writes an txn record while the cache is full but just after
571
// the reload has completed. If the cache is not yet full we need
572
// to load as many of the new records into cache as possible.
576
if (reading_overflow)
577
new_size = rl_log->txn_Overflow;
579
new_size = rl_log->txn_GetNumRecords();
580
if (rl_CanContinue() && (orig_size != new_size)) {
581
rl_ReadLog(read_start, true);
590
void MSTrans::txn_LoadTransactionCache(uint64_t read_start)
592
ReadTXNLog log(this);
594
log.rl_ReadLog(read_start, false);
595
txn_TransCache->tc_UpdateCacheVersion(); // Signal writes to recheck cache for overflow txn refs.
599
void MSTrans::txn_ResizeLog()
604
if (TRANS_CAN_RESIZE) {
605
// TRANS_CAN_RESIZE checks that there is no overflow and the the start position
606
// is less than eol. This implies the from eol to the end of file doesn't contain
611
uint64_t old_size = txn_MaxRecords;
613
if (txn_MaxRecords > txn_ReqestedMaxRecords) { // Shrink the log
614
uint64_t max_resize = txn_MaxRecords - txn_EOL;
616
if ( txn_Start == txn_EOL)
617
max_resize = txn_MaxRecords;
619
max_resize = txn_MaxRecords - txn_EOL;
620
if (!txn_Start) // If start is at '0' then the EOL cannot be wrapped.
625
if (max_resize > (txn_MaxRecords - txn_ReqestedMaxRecords))
626
max_resize = txn_MaxRecords - txn_ReqestedMaxRecords;
628
txn_MaxRecords -= max_resize;
630
txn_MaxRecords = txn_ReqestedMaxRecords; // Grow the log
633
char buffer[CS_EXC_MESSAGE_SIZE];
634
snprintf(buffer, CS_EXC_MESSAGE_SIZE, "Resizing the Transaction log from %"PRIu64" to %"PRIu64" \n", old_size, txn_MaxRecords);
635
CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, buffer);
638
CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
640
txn_File->setEOF(txn_MaxRecords * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec));
641
txn_File->write(&(txn_DiskHeader.th_list_size_8), offsetof(MSDiskTransHeadRec, th_list_size_8), 8);
643
if (txn_Start == txn_EOL) {
646
} else if (txn_MaxRecords == txn_EOL) {
658
void MSTrans::txn_ResetEOL()
662
txn_EOLCheckPoint = txn_MaxCheckPoint;
663
txn_StartCheckPoint = txn_MaxCheckPoint;
667
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
668
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
669
CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
670
txn_File->write(&(txn_DiskHeader.th_start_8),
671
offsetof(MSDiskTransHeadRec, th_start_8),
672
sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
681
void MSTrans::txn_LogTransaction(MS_Txn type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
688
self->myTID = txn_MaxTID;
689
self->myTransRef = TRANS_CACHE_NEW_REF;
690
self->myStartTxn = true;
693
txn_AddTransaction(type, autocommit, db_id, tab_id, blob_id, blob_ref_id);
694
if (autocommit || TRANS_TYPE_IS_TERMINATED(type))
695
txn_NewTransaction();
702
void MSTrans::txn_AddTransaction(uint8_t tran_type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
704
MSTransRec rec = {0}; // This must be set to zero so that the checksum will be valid.
706
uint64_t new_offset = txn_EOL;
707
bool do_flush = true;
712
// Check that the log is not already full.
714
if (!txn_HaveOverflow) { // The first overflow record: update the header.
715
CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_OVERFLOW);
716
txn_File->write(&(txn_DiskHeader.th_overflow_1), offsetof(MSDiskTransHeadRec, th_overflow_1), 1);
718
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
719
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
720
txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
724
txn_HaveOverflow = true;
726
txn_Overflow = txn_MaxRecords;
729
new_offset = txn_Overflow;
732
rec.tr_id = self->myTID ;
733
rec.tr_type = tran_type;
734
rec.tr_db_id = db_id;
735
rec.tr_tab_id = tab_id;
736
rec.tr_blob_id = blob_id;
737
rec.tr_blob_ref_id = blob_ref_id;
739
if (self->myStartTxn) {
740
TRANS_SET_START(rec.tr_type);
741
self->myStartTxn = false;
745
TRANS_SET_AUTOCOMMIT(rec.tr_type);
751
switch (TRANS_TYPE(rec.tr_type)) {
752
case MS_ReferenceTxn:
755
case MS_DereferenceTxn:
760
rec.tr_blob_ref_id = 0;
762
case MS_RecoveredTxn:
764
rec.tr_blob_ref_id = 0;
770
if (TRANS_IS_TERMINATED(rec.tr_type))
775
fprintf(txn_debug_log, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" %"PRIu64" %"PRIu64" %"PRIu64" %d\n", self->myTID, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id, txn_Start, txn_EOL, new_offset, txn_HaveOverflow);
779
rec.tr_check = txn_Checksum;
781
// Calculate the records checksum.
782
rec.tr_check = checksum((uint8_t*)&rec, sizeof(rec));
784
// Write the record to disk.
785
SET_DISK_TRANSREC(&drec, &rec);
788
if (trans_test_crash_point == 9) { // do a partial write before crashing
789
txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec)/2 );
792
txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
794
txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
797
// There is no need to sync if the transaction is still running.
798
if (TRANS_IS_TERMINATED(tran_type)) {
799
CRASH_POINT(4); // This crash will result in a verify error because the txn was committed to the log but not the database.
805
if (!txn_HaveOverflow) { // No need to update the header if overflowing.
806
uint64_t rec_offset = txn_EOL;
808
txn_EOL = new_offset;
811
if (txn_EOL == txn_MaxRecords) {
812
// The eol has rolled over.
817
if ((!txn_EOLCheckPoint) || !txn_EOL) {
819
// Flush the previouse write if required before updating the header.
820
// This is just in case it crashes during the sync to make sure that the
821
// header information is correct for the data on disk. If the crash occurred
822
// between writing the header and the record the header on disk would be wrong.
831
txn_TransCache->tc_AddRec(rec_offset, &rec, self->myTransRef);
833
if (txn_GetNumRecords() > txn_HighWaterMark)
834
txn_HighWaterMark = txn_GetNumRecords();
836
} else { // Ovewrflow
837
txn_TransCache->tc_AddRec(txn_Overflow, &rec, self->myTransRef);
839
if (txn_Overflow > txn_HighWaterMark)
840
txn_HighWaterMark = txn_Overflow;
843
ASSERT(txn_EOL < txn_MaxRecords);
844
ASSERT(txn_Start < txn_MaxRecords);
848
uint64_t MSTrans::txn_GetSize()
850
return sizeof(MSDiskTransHeadRec) + txn_MaxRecords * sizeof(MSDiskTransRec);
854
void MSTrans::txn_NewTransaction()
858
self->myTID = 0; // This will be assigned when the first record is written.
864
void MSTrans::txn_PerformIdleTasks()
868
if (txn_TransCache->tc_ShoulReloadCache()) {
869
txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload());
870
txn_TransCache->tc_CompleteCacheReload();
874
// During backup the reader is suspended. This may need to be changed
875
// if we decide to actually do something here.
876
txn_reader->suspendedWait(1000);
881
void MSTrans::txn_ResetReadPosition(uint64_t pos)
883
bool rollover = (pos < txn_Start);
886
if (pos >= txn_MaxRecords) { // Start of overflow
889
// Overflow has occurred and the circular list is now empty
890
// so expand the list to include the overflow and
891
// reset txn_Start and txn_EOL
892
txn_Start = txn_MaxRecords;
893
txn_MaxRecords = txn_Overflow;
895
txn_HaveOverflow = false;
898
CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
899
CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
900
txn_File->write(&(txn_DiskHeader.th_overflow_1), offsetof(MSDiskTransHeadRec, th_overflow_1), 1);
901
txn_File->write(&(txn_DiskHeader.th_list_size_8), offsetof(MSDiskTransHeadRec, th_list_size_8), 8);
909
ASSERT(txn_Start <= txn_MaxRecords);
912
txn_StartCheckPoint -= (pos - txn_Start);
914
// Flush the header if the read position has rolled over or it is time.
915
if ( rollover || (txn_StartCheckPoint <=0)) {
917
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
918
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
919
txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
923
txn_StartCheckPoint = txn_MaxCheckPoint;
929
if (TRANS_CAN_RESIZE)
935
bool MSTrans::txn_haveNextTransaction()
937
bool terminated = false;
940
txn_TransCache->tc_GetTransaction(&ref, &terminated);
946
void MSTrans::txn_GetNextTransaction(MSTransPtr tran, MS_TxnState *state)
949
uint64_t log_position;
952
ASSERT(txn_reader == self);
956
// Get the next completed transaction.
957
// this will suspend the current thread, which is assumed
958
// to be the log reader, until one is available.
959
while ((!txn_IsTxnValid) && !self->myMustQuit) {
961
// wait until backup has completed.
962
while (txn_Doingbackup && !self->myMustQuit)
963
txn_PerformIdleTasks();
965
if (txn_TransCache->tc_GetTransaction(&txn_CurrentTxn, &terminated) && terminated) {
966
txn_IsTxnValid = true;
969
txn_PerformIdleTasks();
972
if (self->myMustQuit)
975
if (txn_TransCache->tc_GetRecAt(txn_CurrentTxn, txn_TxnIndex++, tran, state))
979
txn_TransCache->tc_FreeTransaction(txn_CurrentTxn);
981
if (txn_TransCache->tc_GetTransactionStartPosition(&log_position)) {
982
txn_ResetReadPosition(log_position);
984
if (txn_TransCache->tc_ShoulReloadCache()) {
985
uint64_t pos = txn_TransCache->tc_StartCacheReload();
986
txn_ResetReadPosition(pos);
987
txn_LoadTransactionCache(pos);
988
txn_TransCache->tc_CompleteCacheReload();
990
// Lock the object to prevent writer thread updates while I check again.
991
// This is to ensure that txn_EOL is not changed between the call to
992
// tc_GetTransactionStartPosition() and setting the read position.
994
if (txn_TransCache->tc_GetTransactionStartPosition(&log_position))
995
txn_ResetReadPosition(log_position);
997
txn_ResetReadPosition(txn_EOL);
1002
txn_IsTxnValid = false;
1006
unlock_(txn_reader);
1011
void MSTrans::txn_GetStats(MSTransStatsPtr stats)
1014
if (txn_HaveOverflow) {
1015
stats->ts_IsOverflowing = true;
1016
stats->ts_LogSize = txn_Overflow;
1018
stats->ts_IsOverflowing = false;
1019
stats->ts_LogSize = txn_GetNumRecords();
1021
stats->ts_PercentFull = (stats->ts_LogSize * 100) / CS_GET_DISK_8(txn_DiskHeader.th_requested_list_size_8);
1023
stats->ts_MaxSize = txn_HighWaterMark;
1024
stats->ts_OverflowCount = txn_OverflowCount;
1026
stats->ts_TransCacheSize = txn_TransCache->tc_GetCacheUsed();
1027
stats->ts_PercentTransCacheUsed = txn_TransCache->tc_GetPercentCacheUsed();
1028
stats->ts_PercentCacheHit = txn_TransCache->tc_GetPercentCacheHit();
1031
void MSTrans::txn_SetCacheSize(uint32_t new_size)
1034
// Important lock order. Writer threads never lock the reader but the reader
1035
// may lock this object so always lock the reader first.
1039
CS_SET_DISK_4(txn_DiskHeader.th_requested_cache_size_4, new_size);
1041
txn_File->write(&(txn_DiskHeader.th_requested_cache_size_4), offsetof(MSDiskTransHeadRec, th_requested_cache_size_4), 4);
1045
txn_TransCache->tc_SetSize(new_size);
1048
unlock_(txn_reader);
1052
void MSTrans::txn_SetLogSize(uint64_t new_size)
1056
// Important lock order. Writer threads never lock the reader but the reader
1057
// may lock this object so always lock the reader first.
1061
txn_ReqestedMaxRecords = (new_size - sizeof(MSDiskTransHeadRec)) / sizeof(MSDiskTransRec);
1063
if (txn_ReqestedMaxRecords < 10)
1064
txn_ReqestedMaxRecords = 10;
1066
CS_SET_DISK_8(txn_DiskHeader.th_requested_list_size_8, txn_ReqestedMaxRecords);
1068
txn_File->write(&(txn_DiskHeader.th_requested_list_size_8), offsetof(MSDiskTransHeadRec, th_requested_list_size_8), 8);
1073
unlock_(txn_reader);
1078
// A helper class for resetting database IDs in the transaction log.
1079
class DBSearchTXNLog : ReadTXNLog {
1081
DBSearchTXNLog(MSTrans *log): ReadTXNLog(log), sdb_db_id(0), sdb_isDirty(false) {}
1086
virtual bool rl_CanContinue() { return true;}
1087
virtual void rl_Load(uint64_t log_position, MSTransPtr rec)
1089
if (rec->tr_db_id == sdb_db_id) {
1092
rl_Store(log_position, rec);
1096
void SetDataBaseIDToZero(uint32_t db_id)
1099
rl_ReadLog(rl_log->txn_GetStartPosition(), false);
1105
// Dropping the database from the transaction log just involves
1106
// scanning the log and setting the database id of any transactions
1107
// involving the dropped database to zero.
1108
void MSTrans::txn_dropDatabase(uint32_t db_id)
1112
// Important lock order. Writer threads never lock the reader but the reader
1113
// may lock this object so always lock the reader first.
1117
// Clear any transaction records in the cache for the dropped database;
1118
txn_TransCache->tc_dropDatabase(db_id);
1120
// Scan the log setting the database ID for any record belonging to the
1121
// dropped database to zero.
1122
DBSearchTXNLog searchLog(this);
1124
searchLog.SetDataBaseIDToZero(db_id);
1127
unlock_(txn_reader);
1132
void MSTrans::txn_DumpLog(const char *file)
1134
size_t size, read_start = 0;
1138
fptr = fopen(file, "w+");
1145
size = txn_Overflow;
1147
size = txn_MaxRecords;
1149
// Dump all the records
1151
MSDiskTransRec diskRecords[1000];
1160
// Read the next block of records.
1161
offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);
1162
txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
1164
for (uint32_t i = 0; i < read_size; i++) {
1165
const char *ttype, *cmt;
1167
MSDiskTransPtr drec = diskRecords + i;
1168
GET_DISK_TRANSREC(&rec, drec);
1170
switch (TRANS_TYPE(rec.tr_type)) {
1171
case MS_ReferenceTxn:
1174
case MS_DereferenceTxn:
1177
case MS_RollBackTxn:
1179
rec.tr_blob_ref_id = 0;
1181
case MS_RecoveredTxn:
1183
rec.tr_blob_ref_id = 0;
1189
if (TRANS_IS_TERMINATED(rec.tr_type))
1195
fprintf(fptr, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" \t %s %s %s\n", rec.tr_id, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id,
1196
((read_start + i) == txn_Start) ? "START":"",
1197
((read_start + i) == txn_EOL) ? "EOL":"",
1198
((read_start + i) == txn_MaxRecords) ? "OverFlow":""
1203
read_start += read_size;