1
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
30
#include "cslib/CSConfig.h"
36
#include "cslib/CSGlobal.h"
37
#include "cslib/CSStrUtil.h"
38
#include "cslib/CSStorage.h"
40
#include "temp_log_ms.h"
41
#include "open_table_ms.h"
42
#include "trans_log_ms.h"
43
#include "transaction_ms.h"
44
#include "parameters_ms.h"
47
// Search the transaction log for a MS_ReferenceTxn record for the given BLOB.
48
// Just search the log file and not the cache. Seaching the cache may be faster but
49
// it would require locks that could block the writers or reader threads and in the worse
50
// case it will still require the reading of the log anyway.
52
// This search doesn't distinguish between transactions that are still running and
53
// transactions that are rolled back.
54
class SearchTXNLog : ReadTXNLog {
56
SearchTXNLog(uint32_t db_id, MSTrans *log): ReadTXNLog(log), st_db_id(db_id) {}
66
virtual bool rl_CanContinue() { return ((!st_found) || !st_terminated);}
67
virtual void rl_Load(uint64_t log_position, MSTransPtr rec)
71
if ( !st_found && (TRANS_TYPE(rec->tr_type) != MS_ReferenceTxn))
75
if ((rec->tr_db_id == st_db_id) && (rec->tr_tab_id == st_tab_id) && (rec->tr_blob_id == st_blob_id)) {
81
st_terminated = TRANS_IS_TERMINATED(rec->tr_type);
83
st_commited = (TRANS_IS_AUTOCOMMIT(rec->tr_type) || (TRANS_TYPE(rec->tr_type) == MS_CommitTxn));
86
bool st_FindBlobRef(bool *committed, uint32_t tab_id, uint64_t blob_id)
89
st_found = st_terminated = st_commited = false;
93
rl_ReadLog(rl_log->txn_GetStartPosition(), false);
94
*committed = st_commited;
99
MSTempLogFile::MSTempLogFile():
100
CSReadBufferedFile(),
106
MSTempLogFile::~MSTempLogFile()
110
myTempLog->release();
113
MSTempLogFile *MSTempLogFile::newTempLogFile(uint32_t id, MSTempLog *temp_log, CSFile *file)
121
if (!(f = new MSTempLogFile()))
122
CSException::throwOSError(CS_CONTEXT, ENOMEM);
130
f->myTempLog = temp_log;
134
MSTempLog::MSTempLog(uint32_t id, MSDatabase *db, off64_t file_size):
137
myTempLogSize(file_size),
139
myTempLogHeadSize(0),
145
MSTempLog::~MSTempLog()
159
void MSTempLog::deleteLog()
164
CSPath *MSTempLog::getLogPath()
168
cs_strcpy(120, file_name, "bs-logs");
169
cs_add_dir_char(120, file_name);
170
cs_strcat(120, file_name, "temp-");
171
cs_strcat(120, file_name, myLogID);
172
cs_strcat(120, file_name, ".bs");
173
return CSPath::newPath(RETAIN(iLogDatabase->myDatabasePath), file_name);
176
MSTempLogFile *MSTempLog::openTempLog()
184
fh = MSTempLogFile::newTempLogFile(myLogID, this, CSFile::newFile(path));
187
fh->open(CSFile::DEFAULT);
189
fh->open(CSFile::CREATE);
190
if (!myTempLogHeadSize) {
191
MSTempLogHeadRec head;
193
lock_(iLogDatabase->myTempLogArray);
194
/* Check again after locking: */
195
if (!myTempLogHeadSize) {
198
if (fh->read(&head, 0, offsetof(MSTempLogHeadRec, th_reserved_4), 0) < offsetof(MSTempLogHeadRec, th_reserved_4)) {
199
CS_SET_DISK_4(head.th_magic_4, MS_TEMP_LOG_MAGIC);
200
CS_SET_DISK_2(head.th_version_2, MS_TEMP_LOG_VERSION);
201
CS_SET_DISK_2(head.th_head_size_2, MS_TEMP_LOG_HEAD_SIZE);
202
CS_SET_DISK_2(head.th_rec_size_2, sizeof(MSTempLogItemRec));
203
CS_SET_DISK_4(head.th_reserved_4, 0);
204
fh->write(&head, 0, sizeof(MSTempLogHeadRec));
208
/* Check the file header: */
209
if (CS_GET_DISK_4(head.th_magic_4) != MS_TEMP_LOG_MAGIC)
210
CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_BAD_HEADER_MAGIC);
211
if (CS_GET_DISK_2(head.th_version_2) > MS_TEMP_LOG_VERSION)
212
CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_VERSION_TOO_NEW);
214
/* Load the header details: */
215
myTempLogHeadSize = CS_GET_DISK_2(head.th_head_size_2);
216
myTemplogRecSize = CS_GET_DISK_2(head.th_rec_size_2);
218
/* File size, cannot be less than header size, adjust to correct offset: */
219
if (myTempLogSize < myTempLogHeadSize)
220
myTempLogSize = myTempLogHeadSize;
221
if ((rem = (myTempLogSize - myTempLogHeadSize) % myTemplogRecSize))
222
myTempLogSize += myTemplogRecSize - rem;
224
unlock_(iLogDatabase->myTempLogArray);
230
time_t MSTempLog::adjustWaitTime(time_t then, time_t now)
234
if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
235
wait = ((then + PBMSParameters::getTempBlobTimeout() - now) * 1000);
238
else if (wait > 120 * 1000)
248
* ---------------------------------------------------------------
252
MSTempLogThread::MSTempLogThread(time_t wait_time, MSDatabase *db):
253
CSDaemon(wait_time, NULL),
254
iTempLogDatabase(db),
262
void MSTempLogThread::close()
265
iTempLogFile->release();
270
bool MSTempLogThread::try_ReleaseBLOBReference(CSThread *self, CSStringBuffer *buffer, uint32_t tab_id, int type, uint64_t blob_id, uint32_t auth_code)
272
volatile bool rtc = true;
274
/* Release the BLOB reference. */
277
if (type == MS_TL_REPO_REF) {
278
MSRepoFile *repo_file;
280
if ((repo_file = iTempLogDatabase->getRepoFileFromPool(tab_id, true))) {
281
frompool_(repo_file);
282
repo_file->checkBlob(buffer, blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
283
backtopool_(repo_file);
287
if ((otab = MSTableList::getOpenTableByID(iTempLogDatabase->myDatabaseID, tab_id))) {
289
if (type == MS_TL_BLOB_REF) {
290
otab->checkBlob(buffer, blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
294
ASSERT(type == MS_TL_TABLE_REF);
295
if ((type == MS_TL_TABLE_REF) && otab->deleteReferences(iTempLogFile->myTempLogID, iLogOffset, &myMustQuit)) {
296
/* Delete the file now... */
299
MSOpenTablePool *tab_pool;
301
tab = otab->getDBTable();
302
from_path = otab->getDBTable()->getTableFile();
310
tab_pool = MSTableList::lockTablePoolForDeletion(otab); // This returns otab to the pool.
313
from_path->removeFile();
314
tab->myDatabase->removeTable(tab);
316
backtopool_(tab_pool); // The will unlock and close the table pool freeing all tables in it.
317
pop_(tab); // Returning the pool will have released this. (YUK!)
334
bool MSTempLogThread::doWork()
337
MSTempLogItemRec log_item;
338
CSStringBuffer *buffer;
339
SearchTXNLog txn_log(iTempLogDatabase->myDatabaseID, MSTransactionManager::tm_Log);
342
new_(buffer, CSStringBuffer(20));
344
while (!myMustQuit) {
347
if (!(iTempLogFile = iTempLogDatabase->openTempLogFile(0, &iLogRecSize, &head_size))) {
351
iLogOffset = head_size;
354
tfer = iTempLogFile->read(&log_item, iLogOffset, sizeof(MSTempLogItemRec), 0);
356
/* No more data to be read: */
358
/* Check to see if there is a log after this: */
359
if (iTempLogDatabase->getTempLogCount() <= 1) {
360
/* The next log does not yet exist. We wait for
361
* it to be created before we delete and
362
* close the current log.
364
myWaitTime = PBMSParameters::getTempBlobTimeout() * 1000;
368
iTempLogFile->myTempLog->deleteLog();
369
iTempLogDatabase->removeTempLog(iTempLogFile->myTempLogID);
372
else if (tfer == sizeof(MSTempLogItemRec)) {
373
/* We have a record: */
382
* Items in the temp log are never updated.
383
* If a temp operation is canceled then the object
384
* records this itself and when the temp operation
385
* is attempted it will recognize by the templog
386
* id and offset that it is no longer a valid
389
tab_id = CS_GET_DISK_4(log_item.ti_table_id_4);
391
type = CS_GET_DISK_1(log_item.ti_type_1);
392
blob_id = CS_GET_DISK_6(log_item.ti_blob_id_6);
393
auth_code = CS_GET_DISK_4(log_item.ti_auth_code_4);
394
then = CS_GET_DISK_4(log_item.ti_time_4);
397
if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
398
/* Time has not yet exired, adjust wait time: */
399
myWaitTime = MSTempLog::adjustWaitTime(then, now);
403
if (try_ReleaseBLOBReference(self, buffer, tab_id, type, blob_id, auth_code)) {
404
int err = self->myException.getErrorCode();
406
if (err == MS_ERR_TABLE_LOCKED) {
409
else if (err == MS_ERR_REMOVING_REPO) {
410
/* Wait for the compactor to finish: */
411
myWaitTime = 2 * 1000;
415
else if ((err == MS_ERR_UNKNOWN_TABLE) || (err == MS_ERR_DATABASE_DELETED))
418
self->myException.log(NULL);
423
// Only part of the data read, don't wait very long to try again:
424
myWaitTime = 2 * 1000;
427
iLogOffset += iLogRecSize;
434
void *MSTempLogThread::completeWork()