1
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25
* PBMS transaction daemon.
30
#include "cslib/CSConfig.h"
36
#include "cslib/CSGlobal.h"
37
#include "cslib/CSStrUtil.h"
38
#include "cslib/CSLog.h"
41
#include "open_table_ms.h"
42
#include "trans_log_ms.h"
43
#include "transaction_ms.h"
44
#include "pbmsdaemon_ms.h"
47
* The pbms_ functions are utility functions supplied by ha_pbms.cc
49
void pbms_take_part_in_transaction(void *thread);
51
MSTrans *MSTransactionManager::tm_Log;
52
MSTransactionThread *MSTransactionManager::tm_Reader;
56
CSDiskValue4 lr_time_4; // The database ID for the operation.
57
CSDiskValue1 lr_state_1; // The transaction state.
58
CSDiskValue1 lr_type_1; // The transaction type. If the first bit is set then the transaction is an autocommit.
59
CSDiskValue4 lr_db_id_4; // The database ID for the operation.
60
CSDiskValue4 lr_tab_id_4; // The table ID for the operation.
61
CSDiskValue8 lr_blob_id_8; // The blob ID for the operation.
62
CSDiskValue8 lr_blob_ref_id_8;// The blob reference id.
63
} MSDiskLostRec, *MSDiskLostPtr;
66
* ---------------------------------------------------------------
67
* The transaction reader thread
70
class MSTransactionThread : public CSDaemon {
72
MSTransactionThread(MSTrans *txn_log);
74
virtual ~MSTransactionThread(){} // Do nothing here because 'self' will no longer be valid, use completeWork().
79
virtual bool doWork();
81
virtual void *completeWork();
87
void reportLostReference(MSTransPtr rec, MS_TxnState state);
88
void dereference(MSTransPtr rec, MS_TxnState state);
89
void commitReference(MSTransPtr rec, MS_TxnState state);
96
MSTransactionThread::MSTransactionThread(MSTrans *txn_log):
102
trt_log->txn_SetReader(this);
105
void MSTransactionThread::close()
108
trt_lostLog->close();
111
void MSTransactionThread::reportLostReference(MSTransPtr rec, MS_TxnState state)
114
const char *t_txt, *s_txt;
115
char b1[16], b2[16], msg[100];
119
//if (PBMSDaemon::isDaemonState(PBMSDaemon::DaemonStartUp) == true)
123
// Do not report errors caused by missing databases or tables.
124
// This can happen if the transaction log is reread after a crash
125
// and transactions are found that belonged to dropped databases
127
db = MSDatabase::getDatabase(rec->tr_db_id, true);
129
goto dont_worry_about_it;
132
tab = db->getTable(rec->tr_tab_id, true);
135
goto dont_worry_about_it;
143
s_txt = "RolledBack";
152
snprintf(b1, 16, "(%d)?", state);
156
switch (TRANS_TYPE(rec->tr_type)) {
157
case MS_DereferenceTxn:
158
t_txt = "Dereference";
160
case MS_ReferenceTxn:
164
snprintf(b2, 16, "(%x)?", rec->tr_type);
168
snprintf(msg, 100, "Lost PBMS record: %s %s db_id: %"PRIu32" tab_id: %"PRIu32" blob_id: %"PRIu64"", s_txt, t_txt, rec->tr_db_id, rec->tr_tab_id, rec->tr_blob_id);
169
CSL.logLine(self, CSLog::Warning, msg);
171
CS_SET_DISK_4(lrec.lr_time_4, time(NULL));
172
CS_SET_DISK_1(lrec.lr_state_1, state);
173
CS_SET_DISK_1(lrec.lr_type_1, rec->tr_type);
174
CS_SET_DISK_4(lrec.lr_db_id_4, rec->tr_db_id);
175
CS_SET_DISK_4(lrec.lr_tab_id_4, rec->tr_tab_id);
176
CS_SET_DISK_8(lrec.lr_blob_id_8, rec->tr_blob_id);
177
CS_SET_DISK_8(lrec.lr_blob_ref_id_8, rec->tr_blob_ref_id);
181
char *str = cs_strdup(trt_log->txn_GetTXNLogPath());
182
cs_remove_last_name_of_path(str);
184
path = CSPath::newPath(str, "pbms_lost_txn.dat");
187
trt_lostLog = CSFile::newFile(path);
188
trt_lostLog->open(CSFile::CREATE);
190
trt_lostLog->write(&lrec, trt_lostLog->getEOF(), sizeof(MSDiskLostRec));
198
void MSTransactionThread::dereference(MSTransPtr rec, MS_TxnState state)
204
otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
206
otab->freeReference(rec->tr_blob_id, rec->tr_blob_ref_id);
211
reportLostReference(rec, state);
218
void MSTransactionThread::commitReference(MSTransPtr rec, MS_TxnState state)
224
otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
226
otab->commitReference(rec->tr_blob_id, rec->tr_blob_ref_id);
231
reportLostReference(rec, state);
238
void MSTransactionThread::flush()
242
// For now I just wait until the transaction queue is empty or
243
// the transaction at the head of the queue has not yet been
246
// What needs to be done is for the transaction log to scan
247
// past the non commited transaction to see if there are any
248
// other committed transaction in the log and apply them if found.
250
wakeup(); // Incase the reader is sleeping.
251
while (trt_log->txn_haveNextTransaction() && !isSuspend() && self->myMustQuit)
256
bool MSTransactionThread::doWork()
261
MSTransRec rec = {0,0,0,0,0,0,0};
263
while (!myMustQuit) {
264
// This will sleep while waiting for the next
265
// completed transaction.
266
trt_log->txn_GetNextTransaction(&rec, &state);
270
if (rec.tr_db_id == 0) // The database was dropped.
273
if (state == MS_Committed){
274
if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn)
275
dereference(&rec, state);
276
else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
277
commitReference(&rec, state);
279
} else if (state == MS_RolledBack) {
280
// There is nothing to do on rollback of a dereference.
282
if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
283
dereference(&rec, state);
285
} else if (state == MS_Recovered) {
286
if ((TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn))
287
reportLostReference(&rec, state); // Report these even though they may not be lost.
289
// Because of the 2 phase commit issue with other engines I cannot
290
// just roll back the transaction because it may have been committed
291
// on the master engine. So to be safe I will always err on the side
292
// of having unreference BLOBs in the repository rather than risking
293
// deleting a BLOB that was referenced. To this end I will commit references
294
// while ignoring (rolling back) dereferences.
295
if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
296
commitReference(&rec, state);
303
self->logException();
304
CSL.logLine(NULL, CSLog::Error, "!!!!!!!! THE PBMS TRANSACTION LOG READER DIED! !!!!!!!!!!!");
310
void *MSTransactionThread::completeWork()
318
trt_lostLog->release();
323
* ---------------------------------------------------------------
324
* The transaction log manager
326
void MSTransactionManager::startUpReader()
328
char pbms_path[PATH_MAX];
331
cs_strcpy(PATH_MAX, pbms_path, PBMSDaemon::getPBMSDir());
332
cs_add_name_to_path(PATH_MAX, pbms_path, "ms-trans-log.dat");
334
tm_Log = MSTrans::txn_NewMSTrans(pbms_path);
335
new_(tm_Reader, MSTransactionThread(RETAIN(tm_Log)));
339
// Wait for the transaction reader to recover any old transaction:
345
void MSTransactionManager::startUp()
350
// Do not start the reader if the pbms dir doesn't exist.
351
path = CSPath::newPath(PBMSDaemon::getPBMSDir());
353
if (path->exists()) {
361
void MSTransactionManager::shutDown()
365
tm_Reader->release();
374
void MSTransactionManager::flush()
380
void MSTransactionManager::suspend(bool do_flush)
388
tm_Reader->suspend();
393
void MSTransactionManager::resume()
402
void MSTransactionManager::commit()
409
self->myStmtCount = 0;
410
self->myStartStmt = 0;
412
tm_Log->txn_LogTransaction(MS_CommitTxn);
418
void MSTransactionManager::rollback()
425
self->myStmtCount = 0;
426
self->myStartStmt = 0;
428
tm_Log->txn_LogTransaction(MS_RollBackTxn);
433
class MSTransactionCheckPoint: public CSString
436
MSTransactionCheckPoint(const char *name, uint32_t stmtCount ):CSString(name)
438
position = stmtCount;
445
void MSTransactionManager::setSavepoint(const char *savePoint)
447
MSTransactionCheckPoint *checkPoint;
450
new_(checkPoint, MSTransactionCheckPoint(savePoint, self->myStmtCount));
453
self->mySavePoints.add(checkPoint);
459
void MSTransactionManager::releaseSavepoint(const char *savePoint)
461
MSTransactionCheckPoint *checkPoint;
465
name = CSString::newString(savePoint);
468
checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
472
self->mySavePoints.remove(checkPoint);
477
void MSTransactionManager::rollbackTo(const char *savePoint)
479
MSTransactionCheckPoint *checkPoint;
483
name = CSString::newString(savePoint);
486
checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
490
uint32_t position = checkPoint->position;
492
self->mySavePoints.remove(checkPoint);
493
rollbackToPosition(position);
500
void MSTransactionManager::rollbackToPosition(uint32_t position)
504
ASSERT(self->myStmtCount > position);
508
tm_Log->txn_LogPartialRollBack(position);
513
void MSTransactionManager::dropDatabase(uint32_t db_id)
520
tm_Log->txn_dropDatabase(db_id);
525
void MSTransactionManager::logTransaction(bool ref, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
533
bool autocommit = false;
534
autocommit = ms_is_autocommit();
537
pbms_take_part_in_transaction(ms_my_get_thread());
540
self->myIsAutoCommit = autocommit;
543
// PBMS always explicitly commits
544
tm_Log->txn_LogTransaction((ref)?MS_ReferenceTxn:MS_DereferenceTxn, false /*autocommit*/, db_id, tab_id, blob_id, blob_ref_id);