1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25
* PBMS transaction daemon.
34
#include "CSStrUtil.h"
40
#include "OpenTable_ms.h"
41
#include "TransLog_ms.h"
42
#include "Transaction_ms.h"
44
MSTrans *MSTransactionManager::tm_Log;
45
MSTransactionThread *MSTransactionManager::tm_Reader;
49
CSDiskValue4 lr_time_4; // The database ID for the operation.
50
CSDiskValue1 lr_state_1; // The transaction state.
51
CSDiskValue1 lr_type_1; // The transaction type. If the first bit is set then the transaction is an autocommit.
52
CSDiskValue4 lr_db_id_4; // The database ID for the operation.
53
CSDiskValue4 lr_tab_id_4; // The table ID for the operation.
54
CSDiskValue8 lr_blob_id_8; // The blob ID for the operation.
55
CSDiskValue8 lr_blob_ref_id_8;// The blob reference id.
56
} MSDiskLostRec, *MSDiskLostPtr;
59
* ---------------------------------------------------------------
60
* The transaction reader thread
63
class MSTransactionThread : public CSDaemon {
65
MSTransactionThread(MSTrans *txn_log);
67
virtual ~MSTransactionThread();
71
virtual bool doWork();
73
virtual void *finalize();
77
void reportLostReference(MSTransPtr rec, MS_TxnState state);
78
void dereference(MSTransPtr rec, MS_TxnState state);
79
void commitReference(MSTransPtr rec, MS_TxnState state);
86
MSTransactionThread::MSTransactionThread(MSTrans *txn_log):
91
log->txn_SetReader(this);
94
MSTransactionThread::~MSTransactionThread()
105
void MSTransactionThread::close()
111
void MSTransactionThread::reportLostReference(MSTransPtr rec, MS_TxnState state)
114
const char *t_txt, *s_txt;
115
char b1[16], b2[16], msg[100];
123
s_txt = "RolledBack";
132
snprintf(b1, 16, "(%d)?", state);
136
switch (TRANS_TYPE(rec->tr_type)) {
137
case MS_DereferenceTxn:
138
t_txt = "Dereference";
140
case MS_ReferenceTxn:
144
snprintf(b2, 16, "(%x)?", rec->tr_type);
148
snprintf(msg, 100, "Lost PBMS record: %s %s db_id: %"PRIu32" tab_id: %"PRIu32" blob_id: %"PRIu64"", s_txt, t_txt, rec->tr_db_id, rec->tr_tab_id, rec->tr_blob_id);
149
CSL.logLine(self, CSLog::Warning, msg);
151
CS_SET_DISK_4(lrec.lr_time_4, time(NULL));
152
CS_SET_DISK_1(lrec.lr_state_1, state);
153
CS_SET_DISK_1(lrec.lr_type_1, rec->tr_type);
154
CS_SET_DISK_4(lrec.lr_db_id_4, rec->tr_db_id);
155
CS_SET_DISK_4(lrec.lr_tab_id_4, rec->tr_tab_id);
156
CS_SET_DISK_8(lrec.lr_blob_id_8, rec->tr_blob_id);
157
CS_SET_DISK_8(lrec.lr_blob_ref_id_8, rec->tr_blob_ref_id);
161
char *str = cs_strdup(log->txn_GetTXNLogPath());
162
cs_remove_last_name_of_path(str);
164
path = CSPath::newPath(str, "pbms_lost_txn.dat");
167
lostLog = CSFile::newFile(path);
168
lostLog->open(CSFile::CREATE);
170
lostLog->write(&lrec, lostLog->getEOF(), sizeof(MSDiskLostRec));
176
void MSTransactionThread::dereference(MSTransPtr rec, MS_TxnState state)
182
otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
184
otab->freeReference(rec->tr_blob_id, rec->tr_blob_ref_id);
189
reportLostReference(rec, state);
196
void MSTransactionThread::commitReference(MSTransPtr rec, MS_TxnState state)
202
otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
204
otab->commitReference(rec->tr_blob_id, rec->tr_blob_ref_id);
209
reportLostReference(rec, state);
216
void MSTransactionThread::flush()
220
// For now I just wait until the transaction queue is empty or
221
// the transaction at the head of the queue has not yet been
224
// What needs to be done is for the transaction log to scan
225
// past the non commited transaction to see if there are any
226
// other committed transaction in the log and apply them if found.
228
wakeup(); // Incase the reader is sleeping.
229
while (log->txn_haveNextTransaction() && !isSuspend())
234
bool MSTransactionThread::doWork()
236
MSTransRec rec = {0};
241
while (!myMustQuit) {
242
// This will sleep while waiting for the next
243
// completed transaction.
244
log->txn_GetNextTransaction(&rec, &state);
248
if (rec.tr_db_id == 0) // The database was dropped.
251
if (state == MS_Committed){
252
if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn)
253
dereference(&rec, state);
254
else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
255
commitReference(&rec, state);
257
} else if (state == MS_RolledBack) {
258
// There is nothing to do on rollback of a dereference.
260
if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
261
dereference(&rec, state);
263
} else if (state == MS_Recovered) {
264
if ((TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn))
265
reportLostReference(&rec, state); // Report these even though they may not be lost.
267
// Because of the 2 phase commit issue with other engines I cannot
268
// just roll back the transaction because it may have been committed
269
// on the master engine. So to be safe I will always err on the side
270
// of having unreference BLOBs in the repository rather than risking
271
// deleting a BLOB that was referenced. To this end I will commit references
272
// while ignoring (rolling back) dereferences.
273
if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
274
commitReference(&rec, state);
281
self->logException();
282
CSL.logLine(NULL, CSLog::Error, "!!!!!!!! THE PBMS TRANSACTION LOG READER DIED! !!!!!!!!!!!");
288
void *MSTransactionThread::finalize()
295
* ---------------------------------------------------------------
296
* The transaction log manager
298
void MSTransactionManager::startUpReader()
303
new_(log, CSStringBuffer(20));
305
log->append(ms_my_get_mysql_home_path());
307
log->append("/ms-trans-log.dat");
309
tm_Log = MSTrans::txn_NewMSTrans(log->getCString());
310
new_(tm_Reader, MSTransactionThread(RETAIN(tm_Log)));
317
void MSTransactionManager::startUp()
322
// Do not start the reader if the pbms dir doesn't exist.
323
path = CSPath::newPath(ms_my_get_mysql_home_path(), "pbms");
325
if (path->exists()) {
333
void MSTransactionManager::shutDown()
337
tm_Reader->release();
346
void MSTransactionManager::flush()
352
void MSTransactionManager::suspend(bool do_flush)
360
tm_Reader->suspend();
365
void MSTransactionManager::resume()
374
void MSTransactionManager::commit()
381
self->myStmtCount = 0;
382
self->myStartStmt = 0;
383
tm_Log->txn_LogTransaction(MS_CommitTxn);
389
void MSTransactionManager::rollback()
396
self->myStmtCount = 0;
397
self->myStartStmt = 0;
398
tm_Log->txn_LogTransaction(MS_RollBackTxn);
403
void MSTransactionManager::rollbackTo(uint32_t position)
407
ASSERT(self->myStmtCount > position);
411
tm_Log->txn_LogPartialRollBack(position);
416
void MSTransactionManager::dropDatabase(uint32_t db_id)
423
tm_Log->txn_dropDatabase(db_id);
428
void MSTransactionManager::logTransaction(bool ref, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
430
bool autocommit = false;
437
autocommit = ms_is_autocommit();
439
pbms_take_part_in_transaction(ms_my_get_thread());
441
self->myIsAutoCommit = autocommit;
444
// PBMS always explicitly commits
445
tm_Log->txn_LogTransaction((ref)?MS_ReferenceTxn:MS_DereferenceTxn, false /*autocommit*/, db_id, tab_id, blob_id, blob_ref_id);