1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
39
#include "CSDirectory.h"
40
#include "CSStrUtil.h"
42
#include "Database_ms.h"
43
#include "OpenTable_ms.h"
44
#include "backup_ms.h"
47
#include "TempLog_ms.h"
48
#include "Network_ms.h"
51
#include "Transaction_ms.h"
52
//#include "SysTab_variable.h"
53
#include "SysTab_httpheader.h"
57
uint64_t MSDatabase::gRepoThreshold;
58
uint64_t MSDatabase::gTempLogThreshold;
59
CSSyncSortedList *MSDatabase::gDatabaseList;
60
CSSparseArray *MSDatabase::gDatabaseArray;
61
uint64_t MSDatabase::gBackupDatabaseID;
63
* -------------------------------------------------------------------------
67
MSDatabase::MSDatabase():
71
myCompactorThread(NULL),
72
myTempLogThread(NULL),
73
myRepostoryList(NULL),
75
myBlobType(MS_STANDARD_STORAGE),
79
#ifdef HAVE_ALIAS_SUPPORT
92
MSDatabase::~MSDatabase()
96
iBackupThread->stop();
97
iBackupThread->release();
101
if (myTempLogThread) {
102
myTempLogThread->stop();
103
myTempLogThread->release();
104
myTempLogThread = NULL;
106
if (myCompactorThread) {
107
myRepostoryList->wakeup(); // The compator thread waits on this.
108
myCompactorThread->stop();
109
myCompactorThread->release();
110
myCompactorThread = NULL;
114
myDatabasePath->release();
117
myDatabaseName->release();
119
iWriteTempLog = NULL;
120
if (myTempLogArray) {
121
myTempLogArray->clear();
122
myTempLogArray->release();
126
iTableList->release();
129
iTableArray->clear();
130
iTableArray->release();
132
if (myRepostoryList) {
133
myRepostoryList->clear();
134
myRepostoryList->release();
136
#ifdef HAVE_ALIAS_SUPPORT
138
iBlobAliases->ma_close();
139
iBlobAliases->release();
143
myBlobCloud->release();
147
const char *MSDatabase::getDatabaseNameCString()
149
return myDatabaseName->getCString();
152
MSTable *MSDatabase::getTable(CSString *tab_name, bool create)
159
if (!(tab = (MSTable *) iTableList->find(tab_name))) {
162
/* Create a new table: */
163
tab = MSTable::newTable(iMaxTableID+1, RETAIN(tab_name), this, (off_t) 0, false);
164
iTableList->add(tab);
165
iTableArray->set(iMaxTableID+1, RETAIN(tab));
176
MSTable *MSDatabase::getTable(const char *tab_name, bool create)
178
return getTable(CSString::newString(tab_name), create);
182
MSTable *MSDatabase::getTable(uint32_t tab_id, bool missing_ok)
188
if (!(tab = (MSTable *) iTableArray->get((u_int) tab_id))) {
193
char buffer[CS_EXC_MESSAGE_SIZE];
195
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown table #");
196
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (u_int) tab_id);
197
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, " in database ");
198
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, getDatabaseNameCString());
199
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_TABLE, buffer);
206
MSTable *MSDatabase::getNextTable(uint32_t *pos)
213
while (i < iTableList->getSize()) {
214
tab = (MSTable *) iTableList->itemAt(i++);
215
if (!tab->isToDelete())
226
void MSDatabase::addTable(uint32_t tab_id, const char *tab_name, off_t file_size, bool to_delete)
230
if (tab_id > iMaxTableID)
231
iMaxTableID = tab_id;
232
tab = MSTable::newTable(tab_id, tab_name, this, file_size, to_delete);
233
iTableList->add(tab);
234
iTableArray->set(tab_id, RETAIN(tab));
237
void MSDatabase::addTableFromFile(CSDirectory *dir, const char *file_name, bool to_delete)
241
char tab_name[MS_TABLE_NAME_SIZE];
243
dir->info(NULL, &file_size, NULL);
244
file_id = ms_file_to_table_id(file_name);
245
ms_file_to_table_name(MS_TABLE_NAME_SIZE, tab_name, file_name);
246
addTable(file_id, tab_name, file_size, to_delete);
249
void MSDatabase::removeTable(MSTable *tab)
254
iTableList->remove(tab->myTableName);
255
iTableArray->remove(tab->myTableID);
261
void MSDatabase::dropTable(MSTable *tab)
266
iTableList->remove(tab->myTableName);
267
iTableArray->remove(tab->myTableID);
269
// Cute: you drop the table by adding it with the 'to_delete' flag set to 'true'
270
addTable(tab->myTableID, tab->myTableName->getCString(), tab->getTableFileSize(), true);
277
// This function is used when dropping tables from a database before
278
// dropping the database itself.
279
CSString *MSDatabase::getATableName()
283
CSString *name = NULL;
288
while ((tab = (MSTable *) iTableList->itemAt(i++)) && tab->isToDelete()) ;
290
name = tab->getTableName();
297
u_int MSDatabase::getTableCount()
299
u_int cnt = 0, i = 0;
305
while ((tab = (MSTable *) iTableList->itemAt(i++))) {
306
if (!tab->isToDelete())
315
void MSDatabase::renameTable(MSTable *tab, const char *to_name)
319
iTableList->remove(tab->myTableName);
320
iTableArray->remove(tab->myTableID);
322
addTable(tab->myTableID, to_name, tab->getTableFileSize(), false);
328
void MSDatabase::openWriteRepo(MSOpenTable *otab)
330
if (otab->myWriteRepo && otab->myWriteRepoFile)
334
if (!otab->myWriteRepo)
335
otab->myWriteRepo = lockRepo(0);
337
/* Now open the repo file for the open table: */
338
otab->myWriteRepo->openRepoFileForWriting(otab);
342
MSRepository *MSDatabase::getRepoFullOfTrash(time_t *ret_wait_time)
344
MSRepository *repo = NULL;
345
time_t wait_time = 0;
348
wait_time = *ret_wait_time;
350
lock_(myRepostoryList);
351
for (u_int i=0; i<myRepostoryList->size(); i++) {
353
if ((repo = (MSRepository *) myRepostoryList->get(i))) {
354
if (!repo->isRemovingFP && !repo->mustBeDeleted && !repo->isRepoLocked()) {
355
if (!repo->myRepoHeadSize) {
356
/* The file has not yet been opened, so the
357
* garbage count will not be known!
359
MSRepoFile *repo_file;
362
unlock_(myRepostoryList);
364
repo_file = repo->openRepoFile();
365
repo_file->release();
367
lock_(myRepostoryList);
370
if (repo->getGarbageLevel() >= MSRepository::gGarbageThreshold) {
371
/* Make sure there are not temp BLOBs in this repository that have
374
time_t now = time(NULL);
375
time_t then = repo->myLastTempTime;
377
/* Check if there are any temp BLOBs to be removed: */
378
if (now > then + MSTempLog::gTempBlobTimeout) {
379
repo->lockRepo(REPO_COMPACTING);
384
/* There are temp BLOBs to wait for... */
385
if (!wait_time || wait_time > MSTempLog::adjustWaitTime(then, now))
386
wait_time = MSTempLog::adjustWaitTime(then, now);
393
unlock_(myRepostoryList);
395
*ret_wait_time = wait_time;
399
MSRepository *MSDatabase::lockRepo(off_t size)
405
lock_(myRepostoryList);
406
free_slot = myRepostoryList->size();
407
/* Find an unlocked repository file that is below the write threshold: */
408
for (u_int i=0; i<myRepostoryList->size(); i++) {
409
if ((repo = (MSRepository *) myRepostoryList->get(i))) {
410
if (!repo->isRepoLocked() && !repo->isRemovingFP && !repo->mustBeDeleted &&
411
repo->myRepoFileSize + size < gRepoThreshold
412
/**/ && repo->getGarbageLevel() < MSRepository::gGarbageThreshold)
421
/* None found, create a new repo file: */
422
new_(repo, MSRepository(free_slot + 1, this, 0));
423
myRepostoryList->set(free_slot, repo);
427
repo->lockRepo(REPO_WRITE); // <- The MSRepository::backToPool() will unlock this.
428
unlock_(myRepostoryList);
432
MSRepoFile *MSDatabase::getRepoFileFromPool(uint32_t repo_id, bool missing_ok)
438
lock_(myRepostoryList);
439
if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1))) {
441
char buffer[CS_EXC_MESSAGE_SIZE];
443
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown repository file: ");
444
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (u_int) repo_id);
445
CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, buffer);
447
unlock_(myRepostoryList);
450
if (repo->isRemovingFP) {
451
char buffer[CS_EXC_MESSAGE_SIZE];
453
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Repository will be removed: ");
454
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (u_int) repo_id);
455
CSException::throwException(CS_CONTEXT, MS_ERR_REMOVING_REPO, buffer);
457
repo->retain(); /* Release is here: [++] */
458
file = repo->getRepoFile();
459
unlock_(myRepostoryList);
462
file = repo->openRepoFile();
463
lock_(myRepostoryList);
464
repo->addRepoFile(file);
466
unlock_(myRepostoryList);
471
void MSDatabase::returnRepoFileToPool(MSRepoFile *file)
476
lock_(myRepostoryList);
478
if ((repo = file->myRepo)) {
479
if (repo->isRemovingFP) {
480
repo->removeRepoFile(file);
481
myRepostoryList->wakeup();
484
repo->returnRepoFile(file);
485
repo->release(); /* [++] here is the release. */
488
unlock_(myRepostoryList);
492
void MSDatabase::removeRepo(uint32_t repo_id, bool *mustQuit)
497
lock_(myRepostoryList);
498
while ((!mustQuit || !*mustQuit) && !iClosing) {
499
if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1)))
501
repo->isRemovingFP = true;
502
if (repo->removeRepoFilesNotInUse()) {
503
myRepostoryList->set(repo_id - 1, NULL);
507
* Wait for the files that are in use to be
510
myRepostoryList->wait();
512
unlock_(myRepostoryList);
516
void MSDatabase::queueTempLogEvent(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
517
uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time)
519
MSTempLogItemRec item;
522
// Each otab object holds an handle to an instance of an OPEN
523
// temp log. This is so that each thread has it's own open temp log
524
// and doesn't need to be opened and close it constantly.
527
lock_(myTempLogArray);
528
if (!iWriteTempLog) {
529
iWriteTempLog = (MSTempLog *) myTempLogArray->last();
530
if (!iWriteTempLog) {
531
new_(iWriteTempLog, MSTempLog(1, this, 0));
532
myTempLogArray->set(1, iWriteTempLog);
535
if (!otab->myTempLogFile)
536
otab->myTempLogFile = iWriteTempLog->openTempLog();
537
else if (otab->myTempLogFile->myTempLogID != iWriteTempLog->myLogID) {
538
otab->myTempLogFile->release();
539
otab->myTempLogFile = NULL;
540
otab->myTempLogFile = iWriteTempLog->openTempLog();
543
if (iWriteTempLog->myTempLogSize >= gTempLogThreshold) {
544
u_int log_id = iWriteTempLog->myLogID + 1;
546
new_(iWriteTempLog, MSTempLog(log_id, this, 0));
547
myTempLogArray->set(log_id, iWriteTempLog);
549
otab->myTempLogFile->release();
550
otab->myTempLogFile = NULL;
551
otab->myTempLogFile = iWriteTempLog->openTempLog();
556
*log_id = iWriteTempLog->myLogID;
557
*log_offset = (uint32_t) iWriteTempLog->myTempLogSize;
560
iWriteTempLog->myTempLogSize += iWriteTempLog->myTemplogRecSize;
561
unlock_(myTempLogArray);
563
CS_SET_DISK_1(item.ti_type_1, type);
564
CS_SET_DISK_4(item.ti_table_id_4, tab_id);
565
CS_SET_DISK_6(item.ti_blob_id_6, blob_id);
566
CS_SET_DISK_4(item.ti_auth_code_4, auth_code);
567
CS_SET_DISK_4(item.ti_time_4, timev);
568
otab->myTempLogFile->write(&item, *log_offset, sizeof(MSTempLogItemRec));
574
#ifdef HAVE_ALIAS_SUPPORT
575
void MSDatabase::queueForDeletion(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
576
uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time, MSDiskAliasPtr aliasDiskRec)
580
queueTempLogEvent(otab, type, tab_id, blob_id, auth_code, log_id, log_offset, q_time);
582
// If it has an alias remove it from the ailias index.
585
deleteBlobAlias(aliasDiskRec);
588
self->logException();
596
MSTempLogFile *MSDatabase::openTempLogFile(uint32_t log_id, size_t *log_rec_size, size_t *log_head_size)
599
MSTempLogFile *log_file = NULL;
602
lock_(myTempLogArray);
604
log = (MSTempLog *) myTempLogArray->get(log_id);
606
log = (MSTempLog *) myTempLogArray->first();
608
log_file = log->openTempLog();
610
*log_rec_size = log->myTemplogRecSize;
612
*log_head_size = log->myTempLogHeadSize;
614
unlock_(myTempLogArray);
618
u_int MSDatabase::getTempLogCount()
623
lock_(myTempLogArray);
624
count = myTempLogArray->size();
625
unlock_(myTempLogArray);
629
void MSDatabase::removeTempLog(uint32_t log_id)
632
lock_(myTempLogArray);
633
myTempLogArray->remove(log_id);
634
unlock_(myTempLogArray);
638
CSObject *MSDatabase::getKey()
640
return (CSObject *) myDatabaseName;
643
int MSDatabase::compareKey(CSObject *key)
645
return myDatabaseName->compare((CSString *) key);
648
MSCompactorThread *MSDatabase::getCompactorThread()
650
return myCompactorThread;
653
CSSyncVector *MSDatabase::getRepositoryList()
655
return myRepostoryList;
658
#ifdef HAVE_ALIAS_SUPPORT
659
uint32_t MSDatabase::registerBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
662
bool can_retry = true;
666
lock_(&iBlobAliaseLock);
669
hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
673
unlock_(&iBlobAliaseLock);
675
// It can be that a duplicater alias exists that was deleted
676
// but the transaction has not been written to the repository yet.
677
// Flush all committed transactions to the repository file.
678
MSTransactionManager::flush();
686
unlock_(&iBlobAliaseLock);
690
uint32_t MSDatabase::updateBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t old_alias_hash, const char *alias)
694
lock_(&iBlobAliaseLock);
696
new_hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
697
iBlobAliases->deleteAlias(repo_id, repo_offset, old_alias_hash);
699
unlock_(&iBlobAliaseLock);
703
void MSDatabase::deleteBlobAlias(MSDiskAliasPtr diskRec)
706
lock_(&iBlobAliaseLock);
707
iBlobAliases->deleteAlias(diskRec);
708
unlock_(&iBlobAliaseLock);
712
void MSDatabase::deleteBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
714
MSDiskAliasRec diskRec;
716
CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);
717
CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);
718
CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);
719
deleteBlobAlias(&diskRec);
722
void MSDatabase::moveBlobAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
725
lock_(&iBlobAliaseLock);
726
iBlobAliases->resetAlias(old_repo_id, old_repo_offset, alias_hash, new_repo_id, new_repo_offset);
727
unlock_(&iBlobAliaseLock);
732
bool MSDatabase::isValidHeaderField(const char *name)
734
bool is_valid = false;
739
if (strcasecmp(name, MS_ALIAS_TAG)) {
740
lock_(&iHTTPMetaDataHeaders);
741
header = CSString::newString(name);
744
is_valid = (iHTTPMetaDataHeaders.find(header) != NULL);
747
unlock_(&iHTTPMetaDataHeaders);
755
void MSDatabase::startUp(const char *default_http_headers)
759
new_(gDatabaseList, CSSyncSortedList);
760
new_(gDatabaseArray, CSSparseArray(5));
761
MSHTTPHeaderTable::setDefaultMetaDataHeaders(default_http_headers);
762
pbmsSystemTablesStartUp();
763
gBackupDatabaseID = 1;
767
void MSDatabase::stopThreads()
773
lock_(gDatabaseList);
775
if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
779
if (db->myTempLogThread) {
780
db->myTempLogThread->stop();
781
db->myTempLogThread->release();
782
db->myTempLogThread = NULL;
784
if (db->myCompactorThread) {
785
db->myRepostoryList->wakeup(); // The compator thread waits on this.
786
db->myCompactorThread->stop();
787
db->myCompactorThread->release();
788
db->myCompactorThread = NULL;
791
if (db->iBackupThread) {
792
db->iBackupThread->stop();
793
db->iBackupThread->release();
794
db->iBackupThread = NULL;
799
unlock_(gDatabaseList);
804
void MSDatabase::shutDown()
807
if (gDatabaseArray) {
808
gDatabaseArray->clear();
809
gDatabaseArray->release();
810
gDatabaseArray = NULL;
814
gDatabaseList->clear();
815
gDatabaseList->release();
816
gDatabaseList = NULL;
819
MSHTTPHeaderTable::releaseDefaultMetaDataHeaders();
820
pbmsSystemTableShutDown();
823
void MSDatabase::setBackupDatabase()
826
// I need to give the backup database a unique fake database ID.
827
// This is so that it is not confused with the database being backed
828
// backed up when opening tables.
830
// Normally database IDs are generated by time(NULL) so small database IDs
831
// are safe to use as fake IDs.
833
lock_(gDatabaseList);
834
myDatabaseID = gBackupDatabaseID++;
835
gDatabaseArray->set(myDatabaseID, RETAIN(this));
838
// Notify the cloud storage, if any, that it is a backup.
839
// This is important because if the backup database is dropped
840
// we need to be sure that only the BLOBs belonging to the
841
// backup are removed from the cloud.
842
myBlobCloud->cl_setCloudIsBackup();
844
unlock_(gDatabaseList);
846
// Rename the database path so that it is obviouse that this is an incomplete backup database.
847
// When the backup is completed it will be renamed back.
848
CSPath *new_path = CSPath::newPath(myDatabasePath->concat("#"));
851
if (new_path->exists())
854
CSPath *db_path = CSPath::newPath(RETAIN(myDatabasePath));
857
db_path->rename(new_path->getNameCString());
858
myDatabasePath->release();
859
myDatabasePath = new_path->getString();
860
myDatabasePath->retain();
869
void MSDatabase::releaseBackupDatabase()
874
// The backup has completed succefully, rename the path to the correct name.
875
CSPath *db_path = CSPath::newPath(myDatabasePath->getCString());
878
myDatabasePath->setLength(myDatabasePath->length()-1);
879
db_path->rename(cs_last_name_of_path(myDatabasePath->getCString()));
882
// Remove the backup database object.
883
lock_(gDatabaseList);
884
gDatabaseArray->remove(myDatabaseID);
885
MSTableList::removeDatabaseTables(this); // Will also release the database object.
886
unlock_(gDatabaseList);
892
void MSDatabase::startBackup(MSBackupInfo *backup_info)
898
if (iBackupThread->isRunning()) {
899
CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, "A backup is still running.");
901
iBackupThread->release();
902
iBackupThread = NULL;
906
iBackupThread = MSBackup::newMSBackup(backup_info);
909
iBackupThread->startBackup(RETAIN(this));
913
iBackupThread->release();
914
iBackupThread = NULL;
922
bool MSDatabase::backupStatus(uint64_t *total, uint64_t *completed, bool *completed_ok)
929
*total = iBackupThread->getBackupSize();
930
*completed = iBackupThread->getBackupCompletedSize();
931
done = !iBackupThread->isRunning();
932
*completed = (iBackupThread->getStatus() == 0);
934
*completed_ok = done = true;
935
*total = *completed = 0;
941
uint32_t MSDatabase::backupID()
943
return (iBackupThread)?iBackupThread->backupID(): 0;
946
void MSDatabase::terminateBackup()
949
iBackupThread->stop();
950
iBackupThread->release();
951
iBackupThread = NULL;
955
MSDatabase *MSDatabase::getDatabase(CSString *db_name, bool create)
962
lock_(gDatabaseList);
963
if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
964
db = MSDatabase::loadDatabase(RETAIN(db_name), create);
971
unlock_(gDatabaseList);
976
MSDatabase *MSDatabase::getDatabase(const char *db_name, bool create)
978
return getDatabase(CSString::newString(db_name), create);
981
MSDatabase *MSDatabase::getDatabase(uint32_t db_id)
986
lock_(gDatabaseList);
987
if ((db = (MSDatabase *) gDatabaseArray->get((u_int) db_id)))
990
// Look for the database folder with the correct ID:
991
CSPath *path = CSPath::newPath(ms_my_get_mysql_home_path(), "pbms");
993
if (path->exists()) {
995
dir = CSDirectory::newDirectory(RETAIN(path));
999
while (dir->next() && !db) {
1000
if (!dir->isFile()) {
1001
const char *ptr, *dir_name = dir->name();
1002
ptr = dir_name + strlen(dir_name) -1;
1004
while (ptr > dir_name && *ptr != '-') ptr--;
1007
int len = ptr - dir_name;
1009
if (atol(ptr) == db_id && len) {
1010
db = getDatabase(CSCString::newString(dir_name, len), true);
1011
ASSERT(db->myDatabaseID == db_id);
1020
unlock_(gDatabaseList);
1023
char buffer[CS_EXC_MESSAGE_SIZE];
1025
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database #");
1026
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (u_int) db_id);
1027
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
1032
void MSDatabase::wakeTempLogThreads()
1039
lock_(gDatabaseList);
1040
for (int i=0;;i++) {
1041
if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
1043
if (db->myTempLogThread)
1044
db->myTempLogThread->wakeup();
1046
unlock_(gDatabaseList);
1050
uint32_t MSDatabase::getDBID(CSPath *path, CSString *db_name)
1054
int len = db_name->length();
1061
// Search for the ID of the database
1062
dir = CSDirectory::newDirectory(RETAIN(path));
1065
while (dir->next() && !db_id)
1067
if (!dir->isFile()){
1068
ptr = dir->name() + strlen(dir->name()) -1;
1069
while (ptr > dir->name() && isdigit(*ptr)) ptr--;
1070
if ((*ptr == '-') && (len == (ptr - dir->name())) && !db_name->compare(dir->name(), len) ) {
1071
db_id = atol(ptr+1);
1080
while (1) { // search for a unique db_id
1081
dir = CSDirectory::newDirectory(RETAIN(path));
1084
while (db_id && dir->next()) {
1085
if (!dir->isFile()) {
1086
ptr = dir->name() + strlen(dir->name()) -1;
1087
while (ptr > dir->name() && isdigit(*ptr)) ptr--;
1088
if ((*ptr == '-') && (db_id == atol(ptr+1))) {
1096
sleep(1); // Allow 1 second to pass.
1106
CSPath *MSDatabase::createDatabasePath(const char *location, CSString *db_name, uint32_t *db_id_ptr, bool *create, bool is_pbms)
1108
bool create_path = *create;
1109
CSPath *path = NULL;
1110
char name_buffer[MS_DATABASE_NAME_SIZE + 40];
1116
path = CSPath::newPath(location, "pbms");
1118
if (!path->exists()) {
1129
// If this is the pbms database then nothing more is to be done.
1133
if ((!db_id_ptr) || !*db_id_ptr) {
1134
db_id = getDBID(RETAIN(path), RETAIN(db_name));
1140
// Create the PBMS database name with ID
1141
cs_strcpy(MS_DATABASE_NAME_SIZE + 40, name_buffer, db_name->getCString());
1142
cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, "-");
1143
cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, (u_int) db_id);
1146
path = CSPath::newPath(path, name_buffer);
1148
if (!path->exists()) {
1168
MSDatabase *MSDatabase::newDatabase(const char *db_location, CSString *db_name, uint32_t db_id, bool create)
1170
MSDatabase *db = NULL;
1174
const char *file_name;
1178
u_int to_delete = 0;
1180
bool is_pbms = false;
1186
//is_pbms = (strcmp(db_name->getCString(), "pbms") == 0); To be done later.
1188
/* Block the creation of the pbms database if there is no MySQL database. */
1189
path = CSPath::newPath(ms_my_get_mysql_home_path(), RETAIN(db_name));
1191
if (create && !path->exists()) {
1192
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
1196
// Create the database path, if 'create' == false then it can return NULL
1197
path = createDatabasePath(db_location, RETAIN(db_name), &db_id, &create, is_pbms);
1204
// Create the database object and initialize it.
1205
if (!(db = new MSDatabase())) {
1206
CSException::throwOSError(CS_CONTEXT, ENOMEM);
1209
db->myIsPBMS = is_pbms;
1210
db_path = path->getString();
1215
db->iNextBlobRefId = (uint32_t) time(NULL);
1216
db->iNextBlobRefId <<= 32;
1217
db->iNextBlobRefId = COMMIT_MASK(db->iNextBlobRefId);
1218
db->iNextBlobRefId++;
1220
db->myDatabaseID = db_id;
1221
db->myDatabasePath = db_path;
1222
db->myDatabaseName = db_name;
1223
new_(db->myBlobCloud, CloudDB(db_id));
1230
new_(db->myTempLogArray, CSSyncSparseArray(20));
1231
new_(db->iTableList, CSSyncSortedList());
1232
new_(db->iTableArray, CSSparseArray(20));
1233
new_(db->myRepostoryList, CSSyncVector(20));
1236
#ifdef HAVE_ALIAS_SUPPORT
1237
//db->retain(); no retain here, MSAlias() takes a back ref.
1238
new_(db->iBlobAliases, MSAlias(db));
1240
/* "Load" the database: */
1242
/* Get the max table ID: */
1243
dir = CSDirectory::newDirectory(RETAIN(db_path));
1246
while (dir->next()) {
1247
file_name = dir->name();
1248
if (dir->isFile() && cs_is_extension(file_name, "bst"))
1249
db->addTableFromFile(dir, file_name, false);
1253
path = CSPath::newPath(RETAIN(db_path), "bs-repository");
1254
if (path->exists()) {
1255
dir = CSDirectory::newDirectory(path);
1258
while (dir->next()) {
1259
file_name = dir->name();
1260
if (dir->isFile() && cs_is_extension(file_name, "bs")) {
1261
if ((file_id = ms_file_to_table_id(file_name, "repo"))) {
1262
dir->info(NULL, &file_size, NULL);
1263
new_(repo, MSRepository(file_id, db, file_size));
1264
db->myRepostoryList->set(file_id - 1, repo);
1275
path = CSPath::newPath(RETAIN(db_path), "bs-logs");
1276
if (path->exists()) {
1277
dir = CSDirectory::newDirectory(path);
1280
while (dir->next()) {
1281
file_name = dir->name();
1282
if (dir->isFile()) {
1283
if (cs_is_extension(file_name, "bs")) {
1284
if ((file_id = ms_file_to_table_id(file_name, "temp"))) {
1285
dir->info(NULL, &file_size, NULL);
1286
new_(log, MSTempLog(file_id, db, file_size));
1287
db->myTempLogArray->set(file_id, log);
1290
else if (cs_is_extension(file_name, "bst")) {
1291
db->addTableFromFile(dir, file_name, true);
1304
/* Go through and prepare all the tables that are to
1310
while ((tab = (MSTable *) db->iTableList->itemAt(i))) {
1311
if (tab->isToDelete())
1312
tab->prepareToDelete();
1323
void MSDatabase::startThreads()
1330
#ifdef HAVE_ALIAS_SUPPORT
1331
// iBlobAliases->ma_open() must be called before starting any threads.
1332
iBlobAliases->ma_open();
1335
new_(myTempLogThread, MSTempLogThread(1 * 1000, this));
1336
myTempLogThread->start();
1338
#ifdef MS_COMPACTOR_POLLS
1339
new_(myCompactorThread, MSCompactorThread(MS_COMPACTOR_POLL_FREQ, this));
1341
new_(myCompactorThread, MSCompactorThread(MS_DEFAULT_COMPACTOR_WAIT * 1000, this));
1345
myCompactorThread->start();
1350
void MSDatabase::dropDatabase()
1357
if (iBackupThread) {
1358
iBackupThread->stop();
1359
iBackupThread->release();
1360
iBackupThread = NULL;
1363
if (myTempLogThread) {
1364
myTempLogThread->stop();
1365
myTempLogThread->release();
1366
myTempLogThread = NULL;
1369
if (myCompactorThread) {
1370
myRepostoryList->wakeup(); // The compator thread waits on this.
1371
myCompactorThread->stop();
1372
myCompactorThread->release();
1373
myCompactorThread = NULL;
1376
// Call cloud drop database even if the database is not currrently
1377
// using cloud storage just in case to was in the past. If the connection
1378
// to the cloud is not setup then nothing will be done.
1380
myBlobCloud->cl_dropDB();
1383
self->logException();
1389
void MSDatabase::removeDatabasePath(CSString *doomedDatabasePath )
1391
CSPath *path = NULL;
1392
CSDirectory *dir = NULL;
1393
const char *file_name;
1396
push_(doomedDatabasePath);
1398
// Delete repository files
1399
path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-repository");
1401
if (path->exists()) {
1402
dir = CSDirectory::newDirectory(RETAIN(path));
1405
while (dir->next()) {
1406
file_name = dir->name();
1407
if (dir->isFile() && cs_is_extension(file_name, "bs")) {
1412
if (path->isEmpty())
1417
// Delete temp log files.
1418
path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-logs");
1420
if (path->exists()) {
1421
dir = CSDirectory::newDirectory(RETAIN(path));
1424
while (dir->next()) {
1425
file_name = dir->name();
1426
if (dir->isFile() && (cs_is_extension(file_name, "bs") || cs_is_extension(file_name, "bst"))) {
1431
if (path->isEmpty())
1436
// Delete table reference files.
1437
dir = CSDirectory::newDirectory(RETAIN(doomedDatabasePath));
1440
while (dir->next()) {
1441
file_name = dir->name();
1442
if (dir->isFile() && cs_is_extension(file_name, "bst"))
1447
#ifdef HAVE_ALIAS_SUPPORT
1448
path = CSPath::newPath(RETAIN(doomedDatabasePath), ACTIVE_ALIAS_INDEX);
1454
pbms_remove_ststem_tables(RETAIN(doomedDatabasePath));
1456
path = CSPath::newPath(RETAIN(doomedDatabasePath));
1458
if (path->isEmpty() && !path->isLink()) {
1461
CSStringBuffer *new_name;
1462
// If the database folder is not empty we rename it to get it out of the way.
1463
// If it is not renamed it will be reused if a database with the same name is
1464
// created again wich will result in the database ID being reused which may
1465
// have some bad side effects.
1466
new_(new_name, CSStringBuffer());
1468
new_name->append(cs_last_name_of_path(doomedDatabasePath->getCString()));
1469
new_name->append("_DROPPED");
1470
path->rename(new_name->getCString());
1475
release_(doomedDatabasePath);
1477
path = CSPath::newPath(ms_my_get_mysql_home_path(), "pbms");
1479
if (path->isEmpty() && !path->isLink()) {
1487
/* Drop the PBMS database if it exists.
1488
* The root folder 'pbms' will be deleted also
1489
* if it is empty and not a symbolic link.
1490
* The database folder in 'pbms' is deleted if it is empty and
1491
* it is not a symbolic link.
1493
void MSDatabase::dropDatabase(MSDatabase *doomedDatabase, const char *db_name )
1495
CSString *doomedDatabasePath = NULL;
1499
if (doomedDatabase) {
1500
push_(doomedDatabase);
1502
// Remove any pending transactions for the dropped database.
1503
// This is important because if the database is restored it will have the
1504
// same database ID and the old transactions would be applied to it.
1505
MSTransactionManager::dropDatabase(doomedDatabase->myDatabaseID);
1507
doomedDatabasePath = doomedDatabase->myDatabasePath;
1508
doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
1510
MSTableList::removeDatabaseTables(RETAIN(doomedDatabase));
1512
doomedDatabase->dropDatabase(); // Shutdown database threads.
1514
// To avoid a deadlock a lock is not taken on the database list
1515
// if shutdown is in progress. The only database that would be
1516
// dropped during a shutdown is an incomplete backup database.
1517
ASSERT(doomedDatabase->isBackup || !self->myMustQuit);
1518
if (!self->myMustQuit)
1519
lock_(gDatabaseList); // Be sure to shutdown the database before locking this or it can lead to deadlocks
1521
gDatabaseArray->remove(doomedDatabase->myDatabaseID);
1522
if (!doomedDatabase->isBackup)
1523
gDatabaseList->remove(doomedDatabase->getKey());
1524
if (!self->myMustQuit)
1525
unlock_(gDatabaseList);
1526
ASSERT(doomedDatabase->iRefCount == 1);
1527
release_(doomedDatabase);
1531
bool create = false;
1534
path = createDatabasePath(ms_my_get_mysql_home_path(), CSString::newString(db_name), &db_id, &create);
1537
MSTransactionManager::dropDatabase(db_id);
1540
doomedDatabasePath = path->getString();
1541
doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
1546
if (doomedDatabasePath)
1547
removeDatabasePath(doomedDatabasePath);
1553
void MSDatabase::dropDatabase(const char *db_name )
1556
dropDatabase(getDatabase(db_name, false), db_name);
1560
// The table_path can be several things here:
1561
// 1: <absalute path>/<database>/<table>
1562
// 2: <absalute path>/<database>
1563
// 3: <database>/<table>
1564
bool MSDatabase::convertTablePathToIDs(const char *table_path, uint32_t *db_id, uint32_t *tab_id, bool create)
1566
const char *base = ms_my_get_mysql_home_path();
1567
CSString *table_url;
1568
CSString *db_path = NULL;
1569
CSString *db_name = NULL;
1570
CSString *tab_name = NULL;
1577
table_url = CSString::newString(table_path);
1578
if (table_url->startsWith(base)) {
1579
table_url = table_url->right(base);
1584
db_path = table_url->left("/", -1);
1586
tab_name = table_url->right("/", -1);
1589
release_(table_url);
1591
if (db_path->length() == 0) { // Only a database name was supplied.
1596
if (tab_name->length() == 0) {
1597
tab_name->release();
1602
db_name = db_path->right("/", -1);
1604
if (db_name->length() == 0) {
1613
db = MSDatabase::getDatabase(db_name, create); // This will release db_name
1615
*db_id = db->myDatabaseID;
1620
tab = db->getTable(tab_name, create);// This will release tab_name
1623
*tab_id = tab->myTableID;
1631
return_((*tab_id > 0) && (*db_id > 0));
1634
bool MSDatabase::convertTableAndDatabaseToIDs(const char *db_name, const char *tab_name, uint32_t *db_id, uint32_t *tab_id, bool create)
1642
db = MSDatabase::getDatabase(db_name, create);
1645
*db_id = db->myDatabaseID;
1648
tab = db->getTable(tab_name, create);
1650
*tab_id = tab->myTableID;
1658
return_((*tab_id > 0) && (*db_id > 0));
1661
MSDatabase *MSDatabase::loadDatabase(CSString *db_name, bool create)
1666
db = newDatabase(ms_my_get_mysql_home_path(), db_name, 0, create);
1671
gDatabaseList->add(RETAIN(db));
1673
gDatabaseArray->set(db->myDatabaseID, RETAIN(db));
1675
pbms_load_system_tables(RETAIN(db));
1682
uint32_t MSDatabase::getDatabaseID(CSString *db_name, bool create)
1690
lock_(gDatabaseList);
1691
if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
1692
db = MSDatabase::loadDatabase(RETAIN(db_name), create);
1695
id = db->myDatabaseID;
1698
id = db->myDatabaseID;
1701
unlock_(gDatabaseList);
1707
uint32_t MSDatabase::getDatabaseID(const char *db_name, bool create)
1709
return getDatabaseID(CSString::newString(db_name), create);
1712
MSDatabase *MSDatabase::getBackupDatabase(CSString *db_location, CSString *db_name, uint32_t db_id, bool create)
1714
bool was_created = create;
1721
// If the db already exists and 'create' == true then the existing db
1723
// Create the database path, if 'create' == false then it can return NULL
1724
path = createDatabasePath(db_location->getCString(), RETAIN(db_name), &db_id, &was_created);
1726
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
1730
// If we wanted to create it but it already exists then throw an error.
1731
if ( create && !was_created) {
1733
snprintf(str, 120, "Duplicate database: %s", db_name->getCString());
1734
CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, str);
1739
// everything looks OK
1740
db = newDatabase(db_location->getCString(), db_name, db_id, create);
1741
db->setBackupDatabase();
1742
release_(db_location);