1
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25
* PBMS transaction daemon.
30
#include "cslib/CSConfig.h"
36
#include "cslib/CSGlobal.h"
37
#include "cslib/CSStrUtil.h"
38
#include "cslib/CSLog.h"
41
#include "open_table_ms.h"
42
#include "trans_log_ms.h"
43
#include "transaction_ms.h"
44
#include "pbmsdaemon_ms.h"
47
* The pbms_ functions are utility functions supplied by ha_pbms.cc
49
void pbms_take_part_in_transaction(void *thread);
51
MSTrans *MSTransactionManager::tm_Log;
52
MSTransactionThread *MSTransactionManager::tm_Reader;
56
CSDiskValue4 lr_time_4; // The database ID for the operation.
57
CSDiskValue1 lr_state_1; // The transaction state.
58
CSDiskValue1 lr_type_1; // The transaction type. If the first bit is set then the transaction is an autocommit.
59
CSDiskValue4 lr_db_id_4; // The database ID for the operation.
60
CSDiskValue4 lr_tab_id_4; // The table ID for the operation.
61
CSDiskValue8 lr_blob_id_8; // The blob ID for the operation.
62
CSDiskValue8 lr_blob_ref_id_8;// The blob reference id.
63
} MSDiskLostRec, *MSDiskLostPtr;
66
* ---------------------------------------------------------------
67
* The transaction reader thread
70
class MSTransactionThread : public CSDaemon {
72
MSTransactionThread(MSTrans *txn_log);
74
virtual ~MSTransactionThread(){} // Do nothing here because 'self' will no longer be valid, use completeWork().
79
virtual bool doWork();
81
virtual void *completeWork();
87
void reportLostReference(MSTransPtr rec, MS_TxnState state);
88
void dereference(MSTransPtr rec, MS_TxnState state);
89
void commitReference(MSTransPtr rec, MS_TxnState state);
96
MSTransactionThread::MSTransactionThread(MSTrans *txn_log):
102
trt_log->txn_SetReader(this);
105
void MSTransactionThread::close()
108
trt_lostLog->close();
111
void MSTransactionThread::reportLostReference(MSTransPtr rec, MS_TxnState state)
114
const char *t_txt, *s_txt;
115
char b1[16], b2[16], msg[100];
117
//if (PBMSDaemon::isDaemonState(PBMSDaemon::DaemonStartUp) == true)
127
s_txt = "RolledBack";
136
snprintf(b1, 16, "(%d)?", state);
140
switch (TRANS_TYPE(rec->tr_type)) {
141
case MS_DereferenceTxn:
142
t_txt = "Dereference";
144
case MS_ReferenceTxn:
148
snprintf(b2, 16, "(%x)?", rec->tr_type);
152
snprintf(msg, 100, "Lost PBMS record: %s %s db_id: %"PRIu32" tab_id: %"PRIu32" blob_id: %"PRIu64"", s_txt, t_txt, rec->tr_db_id, rec->tr_tab_id, rec->tr_blob_id);
153
CSL.logLine(self, CSLog::Warning, msg);
155
CS_SET_DISK_4(lrec.lr_time_4, time(NULL));
156
CS_SET_DISK_1(lrec.lr_state_1, state);
157
CS_SET_DISK_1(lrec.lr_type_1, rec->tr_type);
158
CS_SET_DISK_4(lrec.lr_db_id_4, rec->tr_db_id);
159
CS_SET_DISK_4(lrec.lr_tab_id_4, rec->tr_tab_id);
160
CS_SET_DISK_8(lrec.lr_blob_id_8, rec->tr_blob_id);
161
CS_SET_DISK_8(lrec.lr_blob_ref_id_8, rec->tr_blob_ref_id);
165
char *str = cs_strdup(trt_log->txn_GetTXNLogPath());
166
cs_remove_last_name_of_path(str);
168
path = CSPath::newPath(str, "pbms_lost_txn.dat");
171
trt_lostLog = CSFile::newFile(path);
172
trt_lostLog->open(CSFile::CREATE);
174
trt_lostLog->write(&lrec, trt_lostLog->getEOF(), sizeof(MSDiskLostRec));
180
void MSTransactionThread::dereference(MSTransPtr rec, MS_TxnState state)
186
otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
188
otab->freeReference(rec->tr_blob_id, rec->tr_blob_ref_id);
193
reportLostReference(rec, state);
200
void MSTransactionThread::commitReference(MSTransPtr rec, MS_TxnState state)
206
otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
208
otab->commitReference(rec->tr_blob_id, rec->tr_blob_ref_id);
213
reportLostReference(rec, state);
220
void MSTransactionThread::flush()
224
// For now I just wait until the transaction queue is empty or
225
// the transaction at the head of the queue has not yet been
228
// What needs to be done is for the transaction log to scan
229
// past the non commited transaction to see if there are any
230
// other committed transaction in the log and apply them if found.
232
wakeup(); // Incase the reader is sleeping.
233
while (trt_log->txn_haveNextTransaction() && !isSuspend() && self->myMustQuit)
238
bool MSTransactionThread::doWork()
243
MSTransRec rec = {0,0,0,0,0,0,0};
245
while (!myMustQuit) {
246
// This will sleep while waiting for the next
247
// completed transaction.
248
trt_log->txn_GetNextTransaction(&rec, &state);
252
if (rec.tr_db_id == 0) // The database was dropped.
255
if (state == MS_Committed){
256
if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn)
257
dereference(&rec, state);
258
else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
259
commitReference(&rec, state);
261
} else if (state == MS_RolledBack) {
262
// There is nothing to do on rollback of a dereference.
264
if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
265
dereference(&rec, state);
267
} else if (state == MS_Recovered) {
268
if ((TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn))
269
reportLostReference(&rec, state); // Report these even though they may not be lost.
271
// Because of the 2 phase commit issue with other engines I cannot
272
// just roll back the transaction because it may have been committed
273
// on the master engine. So to be safe I will always err on the side
274
// of having unreference BLOBs in the repository rather than risking
275
// deleting a BLOB that was referenced. To this end I will commit references
276
// while ignoring (rolling back) dereferences.
277
if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
278
commitReference(&rec, state);
285
self->logException();
286
CSL.logLine(NULL, CSLog::Error, "!!!!!!!! THE PBMS TRANSACTION LOG READER DIED! !!!!!!!!!!!");
292
void *MSTransactionThread::completeWork()
300
trt_lostLog->release();
305
* ---------------------------------------------------------------
306
* The transaction log manager
308
void MSTransactionManager::startUpReader()
310
char pbms_path[PATH_MAX];
313
cs_strcpy(PATH_MAX, pbms_path, PBMSDaemon::getPBMSDir());
314
cs_add_name_to_path(PATH_MAX, pbms_path, "ms-trans-log.dat");
316
tm_Log = MSTrans::txn_NewMSTrans(pbms_path);
317
new_(tm_Reader, MSTransactionThread(RETAIN(tm_Log)));
321
// Wait for the transaction reader to recover any old transaction:
327
void MSTransactionManager::startUp()
332
// Do not start the reader if the pbms dir doesn't exist.
333
path = CSPath::newPath(PBMSDaemon::getPBMSDir());
335
if (path->exists()) {
343
void MSTransactionManager::shutDown()
347
tm_Reader->release();
356
void MSTransactionManager::flush()
362
void MSTransactionManager::suspend(bool do_flush)
370
tm_Reader->suspend();
375
void MSTransactionManager::resume()
384
void MSTransactionManager::commit()
391
self->myStmtCount = 0;
392
self->myStartStmt = 0;
394
tm_Log->txn_LogTransaction(MS_CommitTxn);
400
void MSTransactionManager::rollback()
407
self->myStmtCount = 0;
408
self->myStartStmt = 0;
410
tm_Log->txn_LogTransaction(MS_RollBackTxn);
415
class MSTransactionCheckPoint: public CSString
418
MSTransactionCheckPoint(const char *name, uint32_t stmtCount ):CSString(name)
420
position = stmtCount;
427
void MSTransactionManager::setSavepoint(const char *savePoint)
429
MSTransactionCheckPoint *checkPoint;
432
new_(checkPoint, MSTransactionCheckPoint(savePoint, self->myStmtCount));
435
self->mySavePoints.add(checkPoint);
441
void MSTransactionManager::releaseSavepoint(const char *savePoint)
443
MSTransactionCheckPoint *checkPoint;
447
name = CSString::newString(savePoint);
450
checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
454
self->mySavePoints.remove(checkPoint);
459
void MSTransactionManager::rollbackTo(const char *savePoint)
461
MSTransactionCheckPoint *checkPoint;
465
name = CSString::newString(savePoint);
468
checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
472
uint32_t position = checkPoint->position;
474
self->mySavePoints.remove(checkPoint);
475
rollbackToPosition(position);
482
void MSTransactionManager::rollbackToPosition(uint32_t position)
486
ASSERT(self->myStmtCount > position);
490
tm_Log->txn_LogPartialRollBack(position);
495
void MSTransactionManager::dropDatabase(uint32_t db_id)
502
tm_Log->txn_dropDatabase(db_id);
507
void MSTransactionManager::logTransaction(bool ref, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
515
bool autocommit = false;
516
autocommit = ms_is_autocommit();
519
pbms_take_part_in_transaction(ms_my_get_thread());
522
self->myIsAutoCommit = autocommit;
525
// PBMS always explicitly commits
526
tm_Log->txn_LogTransaction((ref)?MS_ReferenceTxn:MS_DereferenceTxn, false /*autocommit*/, db_id, tab_id, blob_id, blob_ref_id);