1
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25
* PBMS transaction cache.
29
#include "cslib/CSConfig.h"
32
#include "cslib/CSGlobal.h"
34
#include "trans_cache_ms.h"
36
#define LIST_INC_SIZE 256 // If the list starts to grow it is probably because a backup is in progress so it could get quite large.
37
#define MIN_LIST_SIZE 32 // A list size that should be able to handle a normal transaction load.
38
#define MIN_CACHE_RECORDS 2
40
typedef struct myTrans {
41
uint8_t tc_type; // The transaction type. If the first bit is set then the transaction is an autocommit.
42
uint32_t tc_db_id; // The database ID for the operation.
43
uint32_t tc_tab_id; // The table ID for the operation.
44
bool tc_rolled_back; // 'true' if this action has been rolled back.
45
uint64_t tc_blob_id; // The blob ID for the operation.
46
uint64_t tc_blob_ref_id; // The blob reference id.
47
uint64_t tc_position; // The log position of the record.
48
} myTransRec, *myTransPtr;
50
#define BAD_LOG_POSITION ((uint64_t) -1)
51
typedef struct TransList {
56
uint64_t log_position; // The transaction log position of the start of the transaction.
57
MS_TxnState terminated; //
58
size_t size; // The allocated size of the list.
59
size_t len; // The number of records in the list that are being used.
61
} TransListRec, *TransListPtr;
63
MSTransCache::MSTransCache(): CSSharedRefObject(),
70
tc_TotalTransCount(0),
71
tc_TotalCacheCount(0),
72
tc_ReLoadingThread(NULL),
79
MSTransCache::~MSTransCache()
82
for (uint32_t i = 0; i < tc_Size; i++) {
84
cs_free(tc_List[i].list);
90
MSTransCache *MSTransCache::newMSTransCache(uint32_t min_size)
92
MSTransCache *tl = NULL;
95
new_(tl, MSTransCache());
98
if (MIN_LIST_SIZE > min_size)
99
min_size = MIN_LIST_SIZE;
101
tl->tc_Initialize(min_size);
109
void MSTransCache::tc_Initialize(uint32_t size)
114
size++; // Add an extra for the overflow
115
tc_List = (TransListPtr) cs_malloc(size * sizeof(TransListRec));
117
// Give each new transaction list record a short list of transaction records
118
for (uint32_t i = 0; i < tc_Size; i++) {
119
tc_List[i].list = (myTransPtr) cs_malloc(MIN_CACHE_RECORDS * sizeof(myTransRec));
120
tc_List[i].size = MIN_CACHE_RECORDS;
123
tc_List[i].log_position = 0;
124
tc_List[i].terminated = MS_Running;
127
tc_OverFlow = tc_List + tc_Size;
129
tc_OverFlow->list = NULL;
130
tc_OverFlow->size = 0;
131
tc_OverFlow->len = 0;
132
tc_OverFlow->tid = 0;
133
tc_OverFlow->log_position = 0;
134
tc_OverFlow->terminated = MS_Running;
138
//--------------------
139
void MSTransCache::tc_SetSize(uint32_t cache_size)
145
if (cache_size < MIN_LIST_SIZE)
146
cache_size = MIN_LIST_SIZE;
148
// If the cache is being reduced then free the record
149
// lists if the transactions about to be removed.
150
for (uint32_t i = cache_size +1; i < tc_Size; i++) {
152
cs_free(tc_List[i].list);
155
// Add one to cache_size for overflow.
156
cs_realloc((void **) &tc_List, (cache_size +1) * sizeof(TransListRec));
158
if (cache_size > tc_Size) {
159
// Move the overflow record.
160
memcpy(tc_List + cache_size, tc_List + tc_Size, sizeof(TransListRec));
162
for (uint32_t i = tc_Size; i < cache_size; i++) {
163
tc_List[i].list = (myTransPtr) cs_malloc(MIN_CACHE_RECORDS * sizeof(myTransRec));
164
tc_List[i].size = MIN_CACHE_RECORDS;
167
tc_List[i].log_position = 0;
168
tc_List[i].terminated = MS_Running;
174
tc_Size = cache_size;
175
tc_OverFlow = tc_List + tc_Size;
182
bool MSTransCache::tc_ShoulReloadCache()
184
return (((tc_Used +1) < tc_Size) && tc_Full);
187
uint64_t MSTransCache::tc_StartCacheReload(bool startup)
193
ASSERT((startup) || tc_Full);
194
tc_ReLoadingThread = self;
195
tc_OverFlowTID = tc_OverFlow->tid;
198
self->myTransRef = 0;
203
return_(tc_OverFlow->log_position);
206
bool MSTransCache::tc_ContinueCacheReload()
208
// Reload should continue until the list is full again and the termination records
209
// for the first and overflow transactions have been found.
211
// It is assumed the reload will also stop if there are no more records to
212
// be read in from the log.
214
return ((tc_List[tc_First].terminated == MS_Running) || // Keep searching for the terminator for the first txn.
215
(tc_OverFlow->tid == tc_OverFlowTID) || // The old overflow txn has not yet been loaded.
216
(tc_OverFlow->terminated == MS_Running) // If the overflow tnx is terminated then the cache is also full.
221
void MSTransCache::tc_CompleteCacheReload()
225
tc_ReLoadingThread = NULL;
226
if (tc_OverFlowTID) { // Clear the overflow condition;
227
tc_OverFlow->tid = 0;
235
#define OVERFLOW_TREF (tc_Size)
236
#define MAX_TREF (OVERFLOW_TREF +1)
238
// Create a new transaction record for the specified
240
TRef MSTransCache::tc_NewTransaction(uint32_t tid)
247
if (self != tc_ReLoadingThread) {
248
tc_TotalTransCount++;
251
// Once we have entered an overflow state we remain in it until
252
// the cache has been reloaded even if there is now space in the cache.
253
// This is to ensure that the transactions are loaded into the cache
254
// in the correct order.
255
// While reloading, make sure that any attempt to add a transaction by any thread
256
// other than tc_ReLoadingThread recieves an overflow condition.
259
if (tc_ReLoadingThread != self) {
266
ASSERT(tc_OverFlow->tid == tid); // The first txn reloaded should be the overflow txn
270
if (tid == tc_OverFlowTID) {
272
tc_OverFlow->old_tid = tid;
274
tc_OverFlow->tid = 0;
275
tc_OverFlow->terminated = MS_Running;
276
ASSERT((tc_Used +1) < tc_Size); // There should be room in the list for the old everflow txn.
277
} else if (tc_OverFlowTID == 0) {
278
// We are seaching for the end of the overflow txn
279
// and found the start of another txn.
285
if ((tc_Used +1) == tc_Size){
286
// The cache is full.
288
tc_OverFlow->tid = tid; // save the tid of the first transaction to overflow.
289
tc_OverFlow->log_position = BAD_LOG_POSITION;
290
tc_OverFlow->len = 0;
291
tc_OverFlow->terminated = MS_Running;
301
if (self != tc_ReLoadingThread) {
302
tc_TotalCacheCount++;
309
static uint32_t last_tid = 0;
310
static bool last_state = false;
311
if (tc_Recovering != last_state)
314
last_state = tc_Recovering;
315
if (!( ((last_tid + 1) == tid) || !last_tid))
316
printf("Expected tid %"PRIu32"\n", last_tid + 1);
317
ASSERT( ((last_tid + 1) == tid) || !last_tid);
322
tc_List[ref].tid = tid;
323
tc_List[ref].len = 0;
324
tc_List[ref].log_position = BAD_LOG_POSITION;
325
tc_List[ref].terminated = MS_Running;
327
// Update these after initializing the structure because
328
// the reader thread may read it as soon as tc_EOL is updated.
332
if (tc_EOL == tc_Size)
337
self->myTransRef = ref;
338
self->myCacheVersion = tc_CacheVersion;
342
void MSTransCache::tc_FindTXNRef(uint32_t tid, TRef *tref)
344
uint32_t i = tc_First;
347
// Search for the record
348
if (tc_First > tc_EOL) {
349
for (; i < OVERFLOW_TREF && *tref >= MAX_TREF; i++) {
350
if (tc_List[i].tid == tid)
356
for (; i < tc_EOL && *tref >= MAX_TREF; i++) {
357
if (tc_List[i].tid == tid)
361
// Do not return the overflow reference if the tid = tc_OverFlowTID.
362
// This may seem a bit strange but it is needed so that the overflow txn
363
// will get a new non-overflow cache slot when it is reloaded.
364
if ((*tref >= MAX_TREF) && (tid == tc_OverFlow->tid) && (tid != tc_OverFlowTID))
365
*tref = OVERFLOW_TREF;
368
self->myTransRef = *tref;
369
self->myCacheVersion = tc_CacheVersion;
373
// Add a transaction record to an already existing transaction
374
// or possible creating a new one. Depending on the record added this may
375
// also commit or rollback the transaction.
376
void MSTransCache::tc_AddRec(uint64_t log_position, MSTransPtr rec, TRef tref)
384
if (tref == TRANS_CACHE_UNKNOWN_REF) { // It is coming from a reload
385
ASSERT(tc_ReLoadingThread == self); // Sanity check here
387
if ((self->myTID == rec->tr_id) && (self->myTransRef != TRANS_CACHE_UNKNOWN_REF))
388
tref = self->myTransRef;
390
tc_FindTXNRef(rec->tr_id, &tref);
391
if (tref == TRANS_CACHE_UNKNOWN_REF) {
392
if (!TRANS_IS_START(rec->tr_type))
393
goto done; // Ignore partial tansaction reloads.
395
tref = tc_NewTransaction(rec->tr_id);
400
ASSERT((tref <= MAX_TREF) || (tref == TRANS_CACHE_NEW_REF));
401
ASSERT(self->myTID == rec->tr_id);
404
if (tref >= OVERFLOW_TREF) {
405
if (tref == TRANS_CACHE_NEW_REF) {
406
ASSERT(TRANS_IS_START(rec->tr_type));
407
tref = tc_NewTransaction(rec->tr_id);
408
} else if (self->myCacheVersion != tc_CacheVersion) {
409
// Check to see if the transaction if now in the cache
410
tc_FindTXNRef(rec->tr_id, &tref);
413
if (tref >= OVERFLOW_TREF){ // Overflow.
414
if (tref == OVERFLOW_TREF) {
415
if (!tc_OverFlow->len)
416
tc_OverFlow->log_position = log_position;
419
if (TRANS_IS_TERMINATED(rec->tr_type)) {
420
if (rec->tr_type == MS_RollBackTxn)
421
tc_OverFlow->terminated = MS_RolledBack;
422
else if (rec->tr_type == MS_RecoveredTxn)
423
tc_OverFlow->terminated = MS_Recovered;
425
tc_OverFlow->terminated = MS_Committed;
433
lrec = tc_List + tref;
436
ASSERT(lrec->tid == rec->tr_id);
438
if (!lrec->len) { // The first record in the transaction
439
lrec->log_position = log_position;
440
} else if (( (TRANS_TYPE(rec->tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec->tr_type) == MS_DereferenceTxn)) && !tc_Recovering) {
441
// Make sure the record isn't already in the list.
442
// This can happen during cache reload.
443
for (uint32_t i = 0; i < lrec->len; i++) {
444
if (lrec->list[i].tc_position == log_position)
449
// During recovery there is no need to cache the records.
450
if (!tc_Recovering) {
451
switch (TRANS_TYPE(rec->tr_type)) {
454
case MS_RecoveredTxn:
455
// This is handled below;
458
case MS_PartialRollBackTxn:
460
// The rollback position is stored in the place for the database id.
461
for (uint32_t i = rec->tr_db_id;i < lrec->len; i++)
462
lrec->list[i].tc_rolled_back = true;
467
case MS_ReferenceTxn:
468
case MS_DereferenceTxn:
472
if (lrec->len == lrec->size) { //Grow the list if required
473
cs_realloc((void **) &(lrec->list), (lrec->size + 10)* sizeof(myTransRec));
477
my_rec = lrec->list + lrec->len;
478
my_rec->tc_type = rec->tr_type;
479
my_rec->tc_db_id = rec->tr_db_id;
480
my_rec->tc_tab_id = rec->tr_tab_id;
481
my_rec->tc_blob_id = rec->tr_blob_id;
482
my_rec->tc_blob_ref_id = rec->tr_blob_ref_id;
483
my_rec->tc_position = log_position;
484
my_rec->tc_rolled_back = false;
491
} else if ( (TRANS_TYPE(rec->tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec->tr_type) == MS_DereferenceTxn))
495
// Check to see if this is a commit or rollback
496
// Do this last because as soon as it is marked as terminated
497
// the reader thread may start processing it.
498
if (TRANS_IS_TERMINATED(rec->tr_type)) {
499
if (rec->tr_type == MS_RollBackTxn)
500
lrec->terminated = MS_RolledBack;
501
else if (rec->tr_type == MS_RecoveredTxn)
502
lrec->terminated = MS_Recovered;
504
lrec->terminated = MS_Committed;
512
// Get the transaction ref of the first transaction in the list.
513
// Sets committed to true or false depending on if the transaction is terminated.
514
// If there is no trsansaction then false is returned.
515
bool MSTransCache::tc_GetTransaction(TRef *ref, bool *terminated)
520
ASSERT(tc_List[tc_First].tid);
523
*terminated = (tc_List[tc_First].terminated != MS_Running);
529
bool MSTransCache::tc_GetTransactionStartPosition(uint64_t *log_position)
531
if ((!tc_Used) || (tc_List[tc_First].len == 0))
534
*log_position = tc_List[tc_First].log_position;
539
MS_TxnState MSTransCache::tc_TransactionState(TRef ref)
541
ASSERT((ref < tc_Size) && tc_List[ref].tid);
543
return tc_List[ref].terminated;
546
uint32_t MSTransCache::tc_GetTransactionID(TRef ref)
548
ASSERT((ref < tc_Size) && tc_List[ref].tid);
550
return (tc_List[ref].tid);
553
// Remove the transaction and all record associated with it.
554
void MSTransCache::tc_FreeTransaction(TRef tref)
558
ASSERT(tc_Used && (tref < tc_Size) && tc_List[tref].tid);
562
static uint32_t last_tid = 0;
563
static bool last_state = false;
565
if (tc_Recovering != last_state)
568
last_state = tc_Recovering;
569
ASSERT( ((last_tid + 1) == tc_List[tref].tid) || !last_tid);
570
last_tid = tc_List[tref].tid;
574
lrec = tc_List + tref;
576
lrec->old_tid = lrec->tid;
581
if (lrec->size > 10) { // Free up some excess records.
582
cs_realloc((void **) &(lrec->list), 10* sizeof(myTransRec));
589
if (tref == tc_First) { // Reset the start of the list.
590
TRef eol = tc_EOL; // cache this incase it changes
592
// Skip any unused records indicated by a zero tid.
593
if (tc_First > eol) {
594
for (; tc_First < tc_Size && !tc_List[tc_First].tid; tc_First++) ;
596
if (tc_First == tc_Size)
600
for (; tc_First < eol && !tc_List[tc_First].tid; tc_First++) ;
603
ASSERT( (tc_Used == 0 && tc_First == tc_EOL) || (tc_Used != 0 && tc_First != tc_EOL));
610
//--------------------
611
bool MSTransCache::tc_GetRecAt(TRef tref, size_t index, MSTransPtr rec, MS_TxnState *state)
616
ASSERT(tc_Used && (tref < tc_Size) && tc_List[tref].tid);
619
static uint32_t last_tid = 0;
621
ASSERT( ((last_tid + 1) == tc_List[tref].tid) || (last_tid == tc_List[tref].tid) || !last_tid);
622
last_tid = tc_List[tref].tid;
626
lrec = tc_List + tref;
627
if (index < lrec->len) {
628
myTransPtr my_rec = lrec->list + index;
630
rec->tr_type = my_rec->tc_type;
631
rec->tr_db_id = my_rec->tc_db_id;
632
rec->tr_tab_id = my_rec->tc_tab_id;
633
rec->tr_blob_id = my_rec->tc_blob_id;
634
rec->tr_blob_ref_id = my_rec->tc_blob_ref_id;
635
rec->tr_id = lrec->tid;
637
if (my_rec->tc_rolled_back)
638
*state = MS_RolledBack;
640
*state = lrec->terminated;
648
//--------------------
649
void MSTransCache::tc_dropDatabase(uint32_t db_id)
654
for (uint32_t i = 0; i < tc_Size; i++) {
655
myTransPtr rec = tc_List[i].list;
657
uint32_t list_len = tc_List[i].len;
659
if (rec->tc_db_id == db_id)