1
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25
* PBMS transaction handling.
27
* PBMS uses 1 circular transaction log. All BLOB reference operations are written to this log
28
* and are applied to the repository when committed. There is 1 thread dedicated to reading the
29
* transaction log and applying the changes. During an engine level backup this thread is suspended
30
* so that no transactions will be applied to the repository files as they are backed up.
33
#include "cslib/CSConfig.h"
38
#include "cslib/CSGlobal.h"
39
#include "cslib/CSStrUtil.h"
40
#include "cslib/CSStorage.h"
42
#include "trans_log_ms.h"
43
#include "trans_cache_ms.h"
46
uint32_t trans_test_crash_point;
47
#define CRASH_POINT(p) { if (p == trans_test_crash_point) { char *ptr = NULL; printf("Crash on demand at: %s(%d), start: %"PRIu64", eol: %"PRIu64"\n", __FILE__, __LINE__, txn_Start, txn_EOL); *ptr = 88;}}
49
#define CRASH_POINT(p)
52
#define MS_TRANS_LOG_MAGIC 0xA6E7D7B3
53
#define MS_TRANS_LOG_VERSION 1
54
#define MS_TRANS_LOG_RECOVERED 0XA1
55
#define MS_TRANS_LOG_NOT_RECOVERED 0XA2
56
#define MS_TRANS_NO_OVERFLOW 0XB1
57
#define MS_TRANS_OVERFLOW 0XB2
59
#define DFLT_TRANS_CHECKPOINT_THRESHOLD 1024
61
#define DFLT_TRANS_LOG_LIST_SIZE (1024 * 10)
62
#define DFLT_TRANS_CACHE_SIZE (500)
64
#define TRANS_CAN_RESIZE ((txn_MaxRecords != txn_ReqestedMaxRecords) && (txn_EOL >= txn_Start) && !txn_HaveOverflow)
66
typedef struct MSDiskTrans {
67
CSDiskValue4 dtr_id_4; // The transaction ID
68
CSDiskValue1 dtr_type_1; // The transaction type. If the first bit is set then the transaction is an autocommit.
69
CSDiskValue1 dtr_check_1; // The trransaction record checksum.
70
CSDiskValue4 dtr_db_id_4; // The database ID for the operation.
71
CSDiskValue4 dtr_tab_id_4; // The table ID for the operation.
72
CSDiskValue8 dtr_blob_id_8; // The blob ID for the operation.
73
CSDiskValue8 dtr_blob_ref_id_8; // The blob reference id.
74
} MSDiskTransRec, *MSDiskTransPtr;
76
#define SET_DISK_TRANSREC(d, s) { \
77
CS_SET_DISK_4((d)->dtr_id_4, (s)->tr_id);\
78
CS_SET_DISK_1((d)->dtr_type_1, (s)->tr_type);\
79
CS_SET_DISK_1((d)->dtr_check_1, (s)->tr_check);\
80
CS_SET_DISK_4((d)->dtr_db_id_4, (s)->tr_db_id);\
81
CS_SET_DISK_4((d)->dtr_tab_id_4, (s)->tr_tab_id);\
82
CS_SET_DISK_8((d)->dtr_blob_id_8, (s)->tr_blob_id);\
83
CS_SET_DISK_8((d)->dtr_blob_ref_id_8, (s)->tr_blob_ref_id);\
86
#define GET_DISK_TRANSREC(s, d) { \
87
(s)->tr_id = CS_GET_DISK_4((d)->dtr_id_4);\
88
(s)->tr_type = CS_GET_DISK_1((d)->dtr_type_1);\
89
(s)->tr_check = CS_GET_DISK_1((d)->dtr_check_1);\
90
(s)->tr_db_id = CS_GET_DISK_4((d)->dtr_db_id_4);\
91
(s)->tr_tab_id = CS_GET_DISK_4((d)->dtr_tab_id_4);\
92
(s)->tr_blob_id = CS_GET_DISK_8((d)->dtr_blob_id_8);\
93
(s)->tr_blob_ref_id = CS_GET_DISK_8((d)->dtr_blob_ref_id_8);\
96
static uint8_t checksum(uint8_t *data, size_t len)
98
register uint32_t sum = 0, g;
101
chk = data + len - 1;
103
sum = (sum << 4) + *chk;
104
if ((g = sum & 0xF0000000)) {
105
sum = sum ^ (g >> 24);
110
return (uint8_t) (sum ^ (sum >> 24) ^ (sum >> 16) ^ (sum >> 8));
115
txn_MaxCheckPoint(0),
116
txn_Doingbackup(false),
118
txn_IsTxnValid(false),
121
txn_StartCheckPoint(0),
122
txn_TransCache(NULL),
123
txn_BlockingTransaction(0),
125
txn_EOLCheckPoint(0),
127
txn_ReqestedMaxRecords(0),
128
txn_HighWaterMark(0),
129
txn_OverflowCount(0),
131
txn_Recovered(false),
132
txn_HaveOverflow(false),
144
txn_TransCache->release();
148
void MSTrans::txn_Close()
152
// Set the header to indicate that the log has not been closed properly.
153
CS_SET_DISK_4(txn_DiskHeader.th_next_txn_id_4, txn_MaxTID);
154
txn_File->write(&(txn_DiskHeader.th_next_txn_id_4), offsetof(MSDiskTransHeadRec, th_next_txn_id_4), 4 );
156
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
157
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
158
CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
159
txn_File->write(&(txn_DiskHeader.th_start_8),
160
offsetof(MSDiskTransHeadRec, th_start_8),
161
sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
167
// Write the recovered flag seperately just incase of a crash during the write operation.
168
CS_SET_DISK_1(txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);
169
txn_File->write(&(txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1 );
179
void MSTrans::txn_SetFile(CSFile *tr_file)
186
static FILE *txn_debug_log;
189
MSTrans *MSTrans::txn_NewMSTrans(const char *log_path, bool dump_log)
191
MSTrans *trans = NULL;
198
new_(trans, MSTrans());
201
path = CSPath::newPath(log_path);
207
if (!path->exists()) { // Create the transaction log.
208
// Preallocate the log space and initialize it.
209
MSDiskTransRec *recs;
210
off64_t offset = sizeof(MSDiskTransHeadRec);
211
uint64_t num_records = DFLT_TRANS_LOG_LIST_SIZE;
215
recs = (MSDiskTransRec *) cs_calloc(1024 * sizeof(MSDiskTransRec));
218
tr_file = path->createFile(CSFile::CREATE);
221
log_size = DFLT_TRANS_LOG_LIST_SIZE * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
224
while (num_records) {
225
if (num_records < 1024)
229
tr_file->write(recs, offset, size * sizeof(MSDiskTransRec));
230
offset += size * sizeof(MSDiskTransRec);
234
trans->txn_MaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
235
trans->txn_ReqestedMaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
236
trans->txn_MaxCheckPoint = DFLT_TRANS_CHECKPOINT_THRESHOLD;
237
trans->txn_MaxTID = 1;
239
// Initialize the log header.
240
CS_SET_DISK_4(trans->txn_DiskHeader.th_magic_4, MS_TRANS_LOG_MAGIC);
241
CS_SET_DISK_2(trans->txn_DiskHeader.th_version_2, MS_TRANS_LOG_VERSION);
243
CS_SET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4, trans->txn_MaxTID);
245
CS_SET_DISK_2(trans->txn_DiskHeader.th_check_point_2, trans->txn_MaxCheckPoint);
247
CS_SET_DISK_8(trans->txn_DiskHeader.th_list_size_8, trans->txn_MaxRecords);
248
CS_SET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8, trans->txn_ReqestedMaxRecords);
250
CS_SET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4, DFLT_TRANS_CACHE_SIZE);
252
CS_SET_DISK_8(trans->txn_DiskHeader.th_start_8, 0);
253
CS_SET_DISK_8(trans->txn_DiskHeader.th_eol_8, 0);
255
CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);
256
CS_SET_DISK_1(trans->txn_DiskHeader.th_checksum_1, 1);
257
CS_SET_DISK_1(trans->txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
259
tr_file->write(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec));
261
trans->txn_SetFile(tr_file);
263
trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
265
trans->txn_TransCache = MSTransCache::newMSTransCache(DFLT_TRANS_CACHE_SIZE);
269
} else { // The transaction log already exists
270
bool overflow = false, recovered = false;
272
CSFile *tr_file = path->createFile(CSFile::DEFAULT); // Open read/write
275
// Read the log header:
276
if (tr_file->read(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec), 0) < sizeof(MSDiskTransHeadRec)) {
282
// check the log header:
283
if (CS_GET_DISK_4(trans->txn_DiskHeader.th_magic_4) != MS_TRANS_LOG_MAGIC)
284
CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_HEADER_MAGIC);
286
if (CS_GET_DISK_2(trans->txn_DiskHeader.th_version_2) != MS_TRANS_LOG_VERSION)
287
CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_VERSION_TOO_NEW);
290
if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_NO_OVERFLOW)
292
else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_OVERFLOW)
295
CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
298
if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_NOT_RECOVERED)
300
else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_RECOVERED)
303
CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
305
// Check that the log is the expected size.
306
log_size = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8) * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
308
if ((log_size > tr_file->getEOF()) ||
309
((log_size < tr_file->getEOF()) && !overflow)){
311
char buffer[CS_EXC_MESSAGE_SIZE];
312
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unexpected transaction log size: ");
313
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, path->getCString());
314
CSException::throwException(CS_CONTEXT, CS_ERR_BAD_FILE_HEADER, buffer);
317
trans->txn_MaxTID = CS_GET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4);
319
// Looks good, we will assume it is a valid log file.
320
trans->txn_TransCache = MSTransCache::newMSTransCache(CS_GET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4));
323
trans->txn_SetFile(tr_file);
325
trans->txn_MaxCheckPoint = CS_GET_DISK_2(trans->txn_DiskHeader.th_check_point_2);
327
trans->txn_MaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8);
328
trans->txn_ReqestedMaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8);
330
trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
331
trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
332
trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
333
trans->txn_HaveOverflow = overflow;
335
trans->txn_Overflow = (tr_file->getEOF() - sizeof(MSDiskTransHeadRec)) /sizeof(MSDiskTransRec);
337
trans->txn_Overflow = 0;
341
printf("Recovering overflow log\n");
344
snprintf(name, 100, "%dms-trans-log.dump", (int)time(NULL));
345
trans->txn_DumpLog(name);
348
// Recover the log if required.
350
trans->txn_Recover();
355
trans->txn_Recovered = true; // Any recovery required has been completed.
357
// The log has been recovered so these values should be valid:
358
trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
359
trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
361
// Set the header to indicate that the log has not been closed properly.
362
// This is reset when the log is closed during shutdown.
363
CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_NOT_RECOVERED);
364
trans->txn_File->write(&(trans->txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1);
366
// Load the transaction records into memory.
367
trans->txn_TransCache->tc_StartCacheReload(true);
368
trans->txn_LoadTransactionCache(trans->txn_Start);
369
trans->txn_TransCache->tc_CompleteCacheReload();
371
if (trans->txn_MaxRecords != trans->txn_ReqestedMaxRecords)
372
trans->txn_ResizeLog(); // Try to resize but it may not be possible yet.
379
txn_debug_log = fopen("log_dump.txt", "w+");
380
if (!txn_debug_log) {
381
perror("log_dump.txt");
388
bool MSTrans::txn_ValidRecord(MSTransPtr rec)
390
uint8_t check = rec->tr_check;
393
rec->tr_check = txn_Checksum;
394
ok = (checksum((uint8_t*)rec, sizeof(MSTransRec)) == check);
395
rec->tr_check = check;
399
void MSTrans::txn_GetRecordAt(uint64_t index, MSTransPtr rec)
404
// Read 1 record from the log and convert it from disk format.
405
offset = sizeof(MSDiskTransHeadRec) + index * sizeof(MSDiskTransRec);
406
txn_File->read(&drec, offset, sizeof(MSDiskTransRec), sizeof(MSDiskTransRec));
407
GET_DISK_TRANSREC(rec, &drec);
410
// Recovery involves finding the start of the first record and the eof
411
// position. The positions will be found at or after the position stored
413
void MSTrans::txn_Recover()
415
MSTransRec rec = {0,0,0,0,0,0,0};
416
uint64_t original_eol = txn_EOL;
420
printf("Recovering transaction log!\n");
424
// Search for the last valid record in the log starting from the last
425
// known position stored in the header.
426
for (; txn_EOL < txn_MaxRecords; txn_EOL++) {
427
txn_GetRecordAt(txn_EOL, &rec);
428
if (! txn_ValidRecord(&rec))
432
if (txn_EOL == txn_MaxRecords) {
433
// It looks like all the records in the log are valid?
434
// This is strange but could happen if the crash
435
// occurred just before updating the header as the
436
// eol position rolled over to the top of the log.
442
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
444
// If the actual eol has moved pass the recorded start position
445
// then the actuall start position must be some where beyond
447
if (((original_eol < txn_Start) || (original_eol > txn_EOL)) && (txn_EOL >= txn_Start))
448
txn_Start = txn_EOL +1;
450
// Position the start at the beginning of a transaction.
451
uint64_t end_search = (txn_Start < txn_EOL)? txn_EOL : txn_MaxRecords;
452
for (; txn_Start < end_search; txn_Start++) {
453
txn_GetRecordAt(txn_Start, &rec);
454
if (TRANS_IS_START(rec.tr_type))
458
if (txn_Start == end_search)
461
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
463
txn_TransCache->tc_SetRecovering(true);
464
// Load the transaction records into the cache.
465
txn_TransCache->tc_StartCacheReload(true);
466
txn_LoadTransactionCache(txn_Start);
467
txn_TransCache->tc_CompleteCacheReload();
469
// Now go through all the transactions and add rollbacks for any
470
// unterminated transactions.
473
while (txn_TransCache->tc_GetTransaction(&ref, &terminated)) {
475
txn_MaxTID = txn_TransCache->tc_GetTransactionID(ref); // Save the TID of the last transaction.
477
self->myTID = txn_MaxTID;
478
self->myTransRef = ref;
479
self->myStartTxn = false;
480
txn_AddTransaction(MS_RecoveredTxn);
483
txn_TransCache->tc_FreeTransaction(ref);
485
// Load the next block of transactions into the cache.
486
// This needs to be done after each tc_GetTransaction() to make sure
487
// that if the transaction terminator is some where in the log
488
// it will get read even if the cache is completely full.
489
if (txn_TransCache->tc_ShoulReloadCache()) {
490
txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload(true));
491
txn_TransCache->tc_CompleteCacheReload();
496
txn_TransCache->tc_SetRecovering(false);
497
self->myTransRef = 0;
499
// Update the header again incase rollbacks have been added.
500
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
505
bool ReadTXNLog::rl_CanContinue()
507
return rl_log->txn_TransCache->tc_ContinueCacheReload();
510
void ReadTXNLog::rl_Load(uint64_t log_position, MSTransPtr rec)
512
rl_log->txn_TransCache->tc_AddRec(log_position, rec);
515
void ReadTXNLog::rl_Store(uint64_t log_position, MSTransPtr rec)
518
SET_DISK_TRANSREC(&drec, rec);
520
rl_log->txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + log_position * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec));
523
void ReadTXNLog::rl_Flush()
525
rl_log->txn_File->flush();
526
rl_log->txn_File->sync();
529
void ReadTXNLog::rl_ReadLog(uint64_t read_start, bool log_locked)
531
uint64_t size, orig_size;
532
bool reading_overflow = (read_start >= rl_log->txn_MaxRecords);
536
// Get the number of transaction records to be loaded.
537
if (reading_overflow) {
538
orig_size = rl_log->txn_Overflow;
539
size = rl_log->txn_Overflow - read_start;
541
orig_size = rl_log->txn_GetNumRecords();
543
if (rl_log->txn_Start <= read_start)
544
size = orig_size - (read_start - rl_log->txn_Start);
546
size = rl_log->txn_EOL - read_start;
549
// load all the records
550
while (size && rl_CanContinue()) {
551
MSDiskTransRec diskRecords[1000];
560
// Check if we have reached the wrap around point in the log.
561
if ((!reading_overflow) && (rl_log->txn_EOL < read_start) && ((rl_log->txn_MaxRecords - read_start) < read_size))
562
read_size = rl_log->txn_MaxRecords - read_start ;
564
// Read the next block of records.
565
offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);
566
rl_log->txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
568
// Convert the records from disk format and add them to the cache.
569
for (uint32_t i = 0; i < read_size && rl_CanContinue(); i++) {
571
MSDiskTransPtr drec = diskRecords + i;
572
GET_DISK_TRANSREC(&rec, drec);
574
rl_Load(read_start + i, &rec);
578
read_start += read_size;
579
if (read_start == rl_log->txn_MaxRecords)
583
if (rl_log->txn_HaveOverflow && !reading_overflow) {
584
if (rl_CanContinue())
585
rl_ReadLog(rl_log->txn_MaxRecords, false);
587
} else if (!log_locked) {
588
// The following is intended to prevent the case where a writer
589
// writes an txn record while the cache is full but just after
590
// the reload has completed. If the cache is not yet full we need
591
// to load as many of the new records into cache as possible.
595
if (reading_overflow)
596
new_size = rl_log->txn_Overflow;
598
new_size = rl_log->txn_GetNumRecords();
599
if (rl_CanContinue() && (orig_size != new_size)) {
600
rl_ReadLog(read_start, true);
609
void MSTrans::txn_LoadTransactionCache(uint64_t read_start)
611
ReadTXNLog log(this);
613
log.rl_ReadLog(read_start, false);
614
txn_TransCache->tc_UpdateCacheVersion(); // Signal writes to recheck cache for overflow txn refs.
618
void MSTrans::txn_ResizeLog()
623
if (TRANS_CAN_RESIZE) {
624
// TRANS_CAN_RESIZE checks that there is no overflow and the the start position
625
// is less than eol. This implies the from eol to the end of file doesn't contain
630
uint64_t old_size = txn_MaxRecords;
632
if (txn_MaxRecords > txn_ReqestedMaxRecords) { // Shrink the log
633
uint64_t max_resize = txn_MaxRecords - txn_EOL;
635
if ( txn_Start == txn_EOL)
636
max_resize = txn_MaxRecords;
638
max_resize = txn_MaxRecords - txn_EOL;
639
if (!txn_Start) // If start is at '0' then the EOL cannot be wrapped.
644
if (max_resize > (txn_MaxRecords - txn_ReqestedMaxRecords))
645
max_resize = txn_MaxRecords - txn_ReqestedMaxRecords;
647
txn_MaxRecords -= max_resize;
649
txn_MaxRecords = txn_ReqestedMaxRecords; // Grow the log
652
char buffer[CS_EXC_MESSAGE_SIZE];
653
snprintf(buffer, CS_EXC_MESSAGE_SIZE, "Resizing the Transaction log from %"PRIu64" to %"PRIu64" \n", old_size, txn_MaxRecords);
654
CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, buffer);
657
CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
659
txn_File->setEOF(txn_MaxRecords * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec));
660
txn_File->write(&(txn_DiskHeader.th_list_size_8), offsetof(MSDiskTransHeadRec, th_list_size_8), 8);
662
if (txn_Start == txn_EOL) {
665
} else if (txn_MaxRecords == txn_EOL) {
677
void MSTrans::txn_ResetEOL()
681
txn_EOLCheckPoint = txn_MaxCheckPoint;
682
txn_StartCheckPoint = txn_MaxCheckPoint;
686
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
687
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
688
CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
689
txn_File->write(&(txn_DiskHeader.th_start_8),
690
offsetof(MSDiskTransHeadRec, th_start_8),
691
sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
700
#define PRINT_TRANS(tid, a, t)
703
#define PRINT_TRANS(tid, a, t) printTrans(tid, a, t)
704
static void printTrans(uint32_t tid, bool autocommit, MS_Txn type)
706
const char *type_name = "???";
710
type_name = "Rollback";
712
case MS_PartialRollBackTxn:
713
type_name = "PartialRollBack";
716
type_name = "Commit";
718
case MS_ReferenceTxn:
719
type_name = "Reference";
721
case MS_DereferenceTxn:
722
type_name = "Dereference";
724
case MS_RecoveredTxn:
725
type_name = "Recovered";
729
fprintf(stderr, "MSTrans::txn_LogTransaction(%d, autocommit = %s, %s)\n", tid, (autocommit)?"On":"Off", type_name);
734
void MSTrans::txn_LogTransaction(MS_Txn type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
742
case MS_PartialRollBackTxn:
747
case MS_ReferenceTxn:
748
case MS_DereferenceTxn:
749
case MS_RecoveredTxn:
753
self->myTID = txn_MaxTID;
754
self->myTransRef = TRANS_CACHE_NEW_REF;
755
self->myStartTxn = true;
758
PRINT_TRANS(self->myTID, autocommit, type);
760
txn_AddTransaction(type, autocommit, db_id, tab_id, blob_id, blob_ref_id);
761
if (autocommit || TRANS_TYPE_IS_TERMINATED(type))
762
txn_NewTransaction();
769
void MSTrans::txn_AddTransaction(uint8_t tran_type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
771
MSTransRec rec = {0,0,0,0,0,0,0}; // This must be set to zero so that the checksum will be valid.
773
uint64_t new_offset = txn_EOL;
774
bool do_flush = true;
779
// Check that the log is not already full.
781
if (!txn_HaveOverflow) { // The first overflow record: update the header.
782
CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_OVERFLOW);
783
txn_File->write(&(txn_DiskHeader.th_overflow_1), offsetof(MSDiskTransHeadRec, th_overflow_1), 1);
785
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
786
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
787
txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
791
txn_HaveOverflow = true;
793
txn_Overflow = txn_MaxRecords;
796
new_offset = txn_Overflow;
799
rec.tr_id = self->myTID ;
800
rec.tr_type = tran_type;
801
rec.tr_db_id = db_id;
802
rec.tr_tab_id = tab_id;
803
rec.tr_blob_id = blob_id;
804
rec.tr_blob_ref_id = blob_ref_id;
806
if (self->myStartTxn) {
807
TRANS_SET_START(rec.tr_type);
808
self->myStartTxn = false;
812
TRANS_SET_AUTOCOMMIT(rec.tr_type);
818
switch (TRANS_TYPE(rec.tr_type)) {
819
case MS_ReferenceTxn:
822
case MS_DereferenceTxn:
827
rec.tr_blob_ref_id = 0;
829
case MS_RecoveredTxn:
831
rec.tr_blob_ref_id = 0;
837
if (TRANS_IS_TERMINATED(rec.tr_type))
842
fprintf(txn_debug_log, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" %"PRIu64" %"PRIu64" %"PRIu64" %d\n", self->myTID, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id, txn_Start, txn_EOL, new_offset, txn_HaveOverflow);
846
rec.tr_check = txn_Checksum;
848
// Calculate the records checksum.
849
rec.tr_check = checksum((uint8_t*)&rec, sizeof(rec));
851
// Write the record to disk.
852
SET_DISK_TRANSREC(&drec, &rec);
855
if (trans_test_crash_point == 9) { // do a partial write before crashing
856
txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec)/2 );
859
txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
861
txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
864
// There is no need to sync if the transaction is still running.
865
if (TRANS_IS_TERMINATED(tran_type)) {
866
CRASH_POINT(4); // This crash will result in a verify error because the txn was committed to the log but not the database.
872
if (!txn_HaveOverflow) { // No need to update the header if overflowing.
873
uint64_t rec_offset = txn_EOL;
875
txn_EOL = new_offset;
878
if (txn_EOL == txn_MaxRecords) {
879
// The eol has rolled over.
884
if ((!txn_EOLCheckPoint) || !txn_EOL) {
886
// Flush the previouse write if required before updating the header.
887
// This is just in case it crashes during the sync to make sure that the
888
// header information is correct for the data on disk. If the crash occurred
889
// between writing the header and the record the header on disk would be wrong.
898
txn_TransCache->tc_AddRec(rec_offset, &rec, self->myTransRef);
900
if (txn_GetNumRecords() > txn_HighWaterMark)
901
txn_HighWaterMark = txn_GetNumRecords();
903
} else { // Ovewrflow
904
txn_TransCache->tc_AddRec(txn_Overflow, &rec, self->myTransRef);
906
if (txn_Overflow > txn_HighWaterMark)
907
txn_HighWaterMark = txn_Overflow;
910
ASSERT(txn_EOL < txn_MaxRecords);
911
ASSERT(txn_Start < txn_MaxRecords);
915
uint64_t MSTrans::txn_GetSize()
917
return sizeof(MSDiskTransHeadRec) + txn_MaxRecords * sizeof(MSDiskTransRec);
921
void MSTrans::txn_NewTransaction()
925
self->myTID = 0; // This will be assigned when the first record is written.
931
void MSTrans::txn_PerformIdleTasks()
935
if (txn_TransCache->tc_ShoulReloadCache()) {
936
txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload());
937
txn_TransCache->tc_CompleteCacheReload();
941
// During backup the reader is suspended. This may need to be changed
942
// if we decide to actually do something here.
943
txn_reader->suspendedWait(1000);
948
void MSTrans::txn_ResetReadPosition(uint64_t pos)
950
bool rollover = (pos < txn_Start);
953
if (pos >= txn_MaxRecords) { // Start of overflow
956
// Overflow has occurred and the circular list is now empty
957
// so expand the list to include the overflow and
958
// reset txn_Start and txn_EOL
959
txn_Start = txn_MaxRecords;
960
txn_MaxRecords = txn_Overflow;
962
txn_HaveOverflow = false;
965
CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
966
CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
967
txn_File->write(&(txn_DiskHeader.th_overflow_1), offsetof(MSDiskTransHeadRec, th_overflow_1), 1);
968
txn_File->write(&(txn_DiskHeader.th_list_size_8), offsetof(MSDiskTransHeadRec, th_list_size_8), 8);
976
ASSERT(txn_Start <= txn_MaxRecords);
979
txn_StartCheckPoint -= (pos - txn_Start);
981
// Flush the header if the read position has rolled over or it is time.
982
if ( rollover || (txn_StartCheckPoint <=0)) {
984
CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
985
CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
986
txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
990
txn_StartCheckPoint = txn_MaxCheckPoint;
996
if (TRANS_CAN_RESIZE)
1002
bool MSTrans::txn_haveNextTransaction()
1004
bool terminated = false;
1007
txn_TransCache->tc_GetTransaction(&ref, &terminated);
1013
void MSTrans::txn_GetNextTransaction(MSTransPtr tran, MS_TxnState *state)
1016
uint64_t log_position;
1019
ASSERT(txn_reader == self);
1023
// Get the next completed transaction.
1024
// this will suspend the current thread, which is assumed
1025
// to be the log reader, until one is available.
1026
while ((!txn_IsTxnValid) && !self->myMustQuit) {
1028
// wait until backup has completed.
1029
while (txn_Doingbackup && !self->myMustQuit)
1030
txn_PerformIdleTasks();
1032
if (txn_TransCache->tc_GetTransaction(&txn_CurrentTxn, &terminated) && terminated) {
1033
txn_IsTxnValid = true;
1036
txn_PerformIdleTasks();
1039
if (self->myMustQuit)
1042
if (txn_TransCache->tc_GetRecAt(txn_CurrentTxn, txn_TxnIndex++, tran, state))
1046
txn_TransCache->tc_FreeTransaction(txn_CurrentTxn);
1048
if (txn_TransCache->tc_GetTransactionStartPosition(&log_position)) {
1049
txn_ResetReadPosition(log_position);
1051
if (txn_TransCache->tc_ShoulReloadCache()) {
1052
uint64_t pos = txn_TransCache->tc_StartCacheReload();
1053
txn_ResetReadPosition(pos);
1054
txn_LoadTransactionCache(pos);
1055
txn_TransCache->tc_CompleteCacheReload();
1057
// Lock the object to prevent writer thread updates while I check again.
1058
// This is to ensure that txn_EOL is not changed between the call to
1059
// tc_GetTransactionStartPosition() and setting the read position.
1061
if (txn_TransCache->tc_GetTransactionStartPosition(&log_position))
1062
txn_ResetReadPosition(log_position);
1064
txn_ResetReadPosition(txn_EOL);
1069
txn_IsTxnValid = false;
1073
unlock_(txn_reader);
1078
void MSTrans::txn_GetStats(MSTransStatsPtr stats)
1081
if (txn_HaveOverflow) {
1082
stats->ts_IsOverflowing = true;
1083
stats->ts_LogSize = txn_Overflow;
1085
stats->ts_IsOverflowing = false;
1086
stats->ts_LogSize = txn_GetNumRecords();
1088
stats->ts_PercentFull = (stats->ts_LogSize * 100) / CS_GET_DISK_8(txn_DiskHeader.th_requested_list_size_8);
1090
stats->ts_MaxSize = txn_HighWaterMark;
1091
stats->ts_OverflowCount = txn_OverflowCount;
1093
stats->ts_TransCacheSize = txn_TransCache->tc_GetCacheUsed();
1094
stats->ts_PercentTransCacheUsed = txn_TransCache->tc_GetPercentCacheUsed();
1095
stats->ts_PercentCacheHit = txn_TransCache->tc_GetPercentCacheHit();
1098
void MSTrans::txn_SetCacheSize(uint32_t new_size)
1101
// Important lock order. Writer threads never lock the reader but the reader
1102
// may lock this object so always lock the reader first.
1106
CS_SET_DISK_4(txn_DiskHeader.th_requested_cache_size_4, new_size);
1108
txn_File->write(&(txn_DiskHeader.th_requested_cache_size_4), offsetof(MSDiskTransHeadRec, th_requested_cache_size_4), 4);
1112
txn_TransCache->tc_SetSize(new_size);
1115
unlock_(txn_reader);
1119
void MSTrans::txn_SetLogSize(uint64_t new_size)
1123
// Important lock order. Writer threads never lock the reader but the reader
1124
// may lock this object so always lock the reader first.
1128
txn_ReqestedMaxRecords = (new_size - sizeof(MSDiskTransHeadRec)) / sizeof(MSDiskTransRec);
1130
if (txn_ReqestedMaxRecords < 10)
1131
txn_ReqestedMaxRecords = 10;
1133
CS_SET_DISK_8(txn_DiskHeader.th_requested_list_size_8, txn_ReqestedMaxRecords);
1135
txn_File->write(&(txn_DiskHeader.th_requested_list_size_8), offsetof(MSDiskTransHeadRec, th_requested_list_size_8), 8);
1140
unlock_(txn_reader);
1145
// A helper class for resetting database IDs in the transaction log.
1146
class DBSearchTXNLog : ReadTXNLog {
1148
DBSearchTXNLog(MSTrans *log): ReadTXNLog(log), sdb_db_id(0), sdb_isDirty(false) {}
1153
virtual bool rl_CanContinue() { return true;}
1154
virtual void rl_Load(uint64_t log_position, MSTransPtr rec)
1156
if (rec->tr_db_id == sdb_db_id) {
1159
rl_Store(log_position, rec);
1163
void SetDataBaseIDToZero(uint32_t db_id)
1166
rl_ReadLog(rl_log->txn_GetStartPosition(), false);
1172
// Dropping the database from the transaction log just involves
1173
// scanning the log and setting the database id of any transactions
1174
// involving the dropped database to zero.
1175
void MSTrans::txn_dropDatabase(uint32_t db_id)
1179
// Important lock order. Writer threads never lock the reader but the reader
1180
// may lock this object so always lock the reader first.
1184
// Clear any transaction records in the cache for the dropped database;
1185
txn_TransCache->tc_dropDatabase(db_id);
1187
// Scan the log setting the database ID for any record belonging to the
1188
// dropped database to zero.
1189
DBSearchTXNLog searchLog(this);
1191
searchLog.SetDataBaseIDToZero(db_id);
1194
unlock_(txn_reader);
1199
void MSTrans::txn_DumpLog(const char *file)
1201
size_t size, read_start = 0;
1205
fptr = fopen(file, "w+");
1212
size = txn_Overflow;
1214
size = txn_MaxRecords;
1216
// Dump all the records
1218
MSDiskTransRec diskRecords[1000];
1227
// Read the next block of records.
1228
offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);
1229
txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
1231
for (uint32_t i = 0; i < read_size; i++) {
1232
const char *ttype, *cmt;
1234
MSDiskTransPtr drec = diskRecords + i;
1235
GET_DISK_TRANSREC(&rec, drec);
1237
switch (TRANS_TYPE(rec.tr_type)) {
1238
case MS_ReferenceTxn:
1241
case MS_DereferenceTxn:
1244
case MS_RollBackTxn:
1246
rec.tr_blob_ref_id = 0;
1248
case MS_RecoveredTxn:
1250
rec.tr_blob_ref_id = 0;
1256
if (TRANS_IS_TERMINATED(rec.tr_type))
1262
fprintf(fptr, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" \t %s %s %s\n", rec.tr_id, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id,
1263
((read_start + i) == txn_Start) ? "START":"",
1264
((read_start + i) == txn_EOL) ? "EOL":"",
1265
((read_start + i) == txn_MaxRecords) ? "OverFlow":""
1270
read_start += read_size;