1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25
* PBMS transaction daemon.
30
#include "cslib/CSConfig.h"
33
#include "cslib/CSGlobal.h"
34
#include "cslib/CSStrUtil.h"
35
#include "cslib/CSLog.h"
39
#include "open_table_ms.h"
40
#include "trans_log_ms.h"
41
#include "transaction_ms.h"
42
#include "pbmsdaemon_ms.h"
45
* The pbms_ functions are utility functions supplied by ha_pbms.cc
47
void pbms_take_part_in_transaction(void *thread);
49
MSTrans *MSTransactionManager::tm_Log;
50
MSTransactionThread *MSTransactionManager::tm_Reader;
54
CSDiskValue4 lr_time_4; // The database ID for the operation.
55
CSDiskValue1 lr_state_1; // The transaction state.
56
CSDiskValue1 lr_type_1; // The transaction type. If the first bit is set then the transaction is an autocommit.
57
CSDiskValue4 lr_db_id_4; // The database ID for the operation.
58
CSDiskValue4 lr_tab_id_4; // The table ID for the operation.
59
CSDiskValue8 lr_blob_id_8; // The blob ID for the operation.
60
CSDiskValue8 lr_blob_ref_id_8;// The blob reference id.
61
} MSDiskLostRec, *MSDiskLostPtr;
64
* ---------------------------------------------------------------
65
* The transaction reader thread
68
class MSTransactionThread : public CSDaemon {
70
MSTransactionThread(MSTrans *txn_log);
72
virtual ~MSTransactionThread(){} // Do nothing here because 'self' will no longer be valid, use completeWork().
77
virtual bool doWork();
79
virtual void *completeWork();
85
void reportLostReference(MSTransPtr rec, MS_TxnState state);
86
void dereference(MSTransPtr rec, MS_TxnState state);
87
void commitReference(MSTransPtr rec, MS_TxnState state);
94
MSTransactionThread::MSTransactionThread(MSTrans *txn_log):
100
trt_log->txn_SetReader(this);
103
void MSTransactionThread::close()
106
trt_lostLog->close();
109
void MSTransactionThread::reportLostReference(MSTransPtr rec, MS_TxnState state)
112
const char *t_txt, *s_txt;
113
char b1[16], b2[16], msg[100];
115
//if (PBMSDaemon::isDaemonState(PBMSDaemon::DaemonStartUp) == true)
125
s_txt = "RolledBack";
134
snprintf(b1, 16, "(%d)?", state);
138
switch (TRANS_TYPE(rec->tr_type)) {
139
case MS_DereferenceTxn:
140
t_txt = "Dereference";
142
case MS_ReferenceTxn:
146
snprintf(b2, 16, "(%x)?", rec->tr_type);
150
snprintf(msg, 100, "Lost PBMS record: %s %s db_id: %"PRIu32" tab_id: %"PRIu32" blob_id: %"PRIu64"", s_txt, t_txt, rec->tr_db_id, rec->tr_tab_id, rec->tr_blob_id);
151
CSL.logLine(self, CSLog::Warning, msg);
153
CS_SET_DISK_4(lrec.lr_time_4, time(NULL));
154
CS_SET_DISK_1(lrec.lr_state_1, state);
155
CS_SET_DISK_1(lrec.lr_type_1, rec->tr_type);
156
CS_SET_DISK_4(lrec.lr_db_id_4, rec->tr_db_id);
157
CS_SET_DISK_4(lrec.lr_tab_id_4, rec->tr_tab_id);
158
CS_SET_DISK_8(lrec.lr_blob_id_8, rec->tr_blob_id);
159
CS_SET_DISK_8(lrec.lr_blob_ref_id_8, rec->tr_blob_ref_id);
163
char *str = cs_strdup(trt_log->txn_GetTXNLogPath());
164
cs_remove_last_name_of_path(str);
166
path = CSPath::newPath(str, "pbms_lost_txn.dat");
169
trt_lostLog = CSFile::newFile(path);
170
trt_lostLog->open(CSFile::CREATE);
172
trt_lostLog->write(&lrec, trt_lostLog->getEOF(), sizeof(MSDiskLostRec));
178
void MSTransactionThread::dereference(MSTransPtr rec, MS_TxnState state)
184
otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
186
otab->freeReference(rec->tr_blob_id, rec->tr_blob_ref_id);
191
reportLostReference(rec, state);
198
void MSTransactionThread::commitReference(MSTransPtr rec, MS_TxnState state)
204
otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
206
otab->commitReference(rec->tr_blob_id, rec->tr_blob_ref_id);
211
reportLostReference(rec, state);
218
void MSTransactionThread::flush()
222
// For now I just wait until the transaction queue is empty or
223
// the transaction at the head of the queue has not yet been
226
// What needs to be done is for the transaction log to scan
227
// past the non commited transaction to see if there are any
228
// other committed transaction in the log and apply them if found.
230
wakeup(); // Incase the reader is sleeping.
231
while (trt_log->txn_haveNextTransaction() && !isSuspend() && self->myMustQuit)
236
bool MSTransactionThread::doWork()
238
MSTransRec rec = {0,0,0,0,0,0,0};
243
while (!myMustQuit) {
244
// This will sleep while waiting for the next
245
// completed transaction.
246
trt_log->txn_GetNextTransaction(&rec, &state);
250
if (rec.tr_db_id == 0) // The database was dropped.
253
if (state == MS_Committed){
254
if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn)
255
dereference(&rec, state);
256
else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
257
commitReference(&rec, state);
259
} else if (state == MS_RolledBack) {
260
// There is nothing to do on rollback of a dereference.
262
if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
263
dereference(&rec, state);
265
} else if (state == MS_Recovered) {
266
if ((TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn))
267
reportLostReference(&rec, state); // Report these even though they may not be lost.
269
// Because of the 2 phase commit issue with other engines I cannot
270
// just roll back the transaction because it may have been committed
271
// on the master engine. So to be safe I will always err on the side
272
// of having unreference BLOBs in the repository rather than risking
273
// deleting a BLOB that was referenced. To this end I will commit references
274
// while ignoring (rolling back) dereferences.
275
if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
276
commitReference(&rec, state);
283
self->logException();
284
CSL.logLine(NULL, CSLog::Error, "!!!!!!!! THE PBMS TRANSACTION LOG READER DIED! !!!!!!!!!!!");
290
void *MSTransactionThread::completeWork()
298
trt_lostLog->release();
303
* ---------------------------------------------------------------
304
* The transaction log manager
306
void MSTransactionManager::startUpReader()
308
char pbms_path[PATH_MAX];
311
cs_strcpy(PATH_MAX, pbms_path, PBMSDaemon::getPBMSDir());
312
cs_add_name_to_path(PATH_MAX, pbms_path, "ms-trans-log.dat");
314
tm_Log = MSTrans::txn_NewMSTrans(pbms_path);
315
new_(tm_Reader, MSTransactionThread(RETAIN(tm_Log)));
319
// Wait for the transaction reader to recover any old transaction:
325
void MSTransactionManager::startUp()
330
// Do not start the reader if the pbms dir doesn't exist.
331
path = CSPath::newPath(PBMSDaemon::getPBMSDir());
333
if (path->exists()) {
341
void MSTransactionManager::shutDown()
345
tm_Reader->release();
354
void MSTransactionManager::flush()
360
void MSTransactionManager::suspend(bool do_flush)
368
tm_Reader->suspend();
373
void MSTransactionManager::resume()
382
void MSTransactionManager::commit()
389
self->myStmtCount = 0;
390
self->myStartStmt = 0;
392
tm_Log->txn_LogTransaction(MS_CommitTxn);
398
void MSTransactionManager::rollback()
405
self->myStmtCount = 0;
406
self->myStartStmt = 0;
408
tm_Log->txn_LogTransaction(MS_RollBackTxn);
413
class MSTransactionCheckPoint: public CSCString
416
MSTransactionCheckPoint(const char *name, uint32_t stmtCount ):CSCString()
418
myCString = cs_strdup(name);
419
myStrLen = strlen(name);
421
position = stmtCount;
428
void MSTransactionManager::setSavepoint(const char *savePoint)
430
MSTransactionCheckPoint *checkPoint;
433
new_(checkPoint, MSTransactionCheckPoint(savePoint, self->myStmtCount));
436
self->mySavePoints.add(checkPoint);
442
void MSTransactionManager::releaseSavepoint(const char *savePoint)
444
MSTransactionCheckPoint *checkPoint;
448
name = CSCString::newString(savePoint);
451
checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
455
self->mySavePoints.remove(checkPoint);
460
void MSTransactionManager::rollbackTo(const char *savePoint)
462
MSTransactionCheckPoint *checkPoint;
466
name = CSCString::newString(savePoint);
469
checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
473
uint32_t position = checkPoint->position;
475
self->mySavePoints.remove(checkPoint);
476
rollbackToPosition(position);
483
void MSTransactionManager::rollbackToPosition(uint32_t position)
487
ASSERT(self->myStmtCount > position);
491
tm_Log->txn_LogPartialRollBack(position);
496
void MSTransactionManager::dropDatabase(uint32_t db_id)
503
tm_Log->txn_dropDatabase(db_id);
508
void MSTransactionManager::logTransaction(bool ref, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
516
bool autocommit = false;
517
autocommit = ms_is_autocommit();
520
pbms_take_part_in_transaction(ms_my_get_thread());
523
self->myIsAutoCommit = autocommit;
526
// PBMS always explicitly commits
527
tm_Log->txn_LogTransaction((ref)?MS_ReferenceTxn:MS_DereferenceTxn, false /*autocommit*/, db_id, tab_id, blob_id, blob_ref_id);