1
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
32
#include <drizzled/common.h>
33
#include <drizzled/session.h>
34
#include <drizzled/table.h>
35
#include <drizzled/message/table.pb.h>
36
#include "drizzled/charset_info.h"
37
#include <drizzled/table_proto.h>
38
#include <drizzled/field.h>
41
#include "cslib/CSConfig.h"
49
#include "cslib/CSGlobal.h"
50
#include "cslib/CSLog.h"
51
#include "cslib/CSDirectory.h"
52
#include "cslib/CSStrUtil.h"
54
#include "database_ms.h"
55
#include "open_table_ms.h"
56
#include "backup_ms.h"
58
#include "temp_log_ms.h"
59
#include "network_ms.h"
62
#include "transaction_ms.h"
63
//#include "systab_variable_ms.h"
64
#include "systab_httpheader_ms.h"
65
#include "parameters_ms.h"
66
#include "pbmsdaemon_ms.h"
70
CSSyncSortedList *MSDatabase::gDatabaseList;
71
CSSparseArray *MSDatabase::gDatabaseArray;
73
* -------------------------------------------------------------------------
77
MSDatabase::MSDatabase():
83
myCompactorThread(NULL),
84
myTempLogThread(NULL),
85
myRepostoryList(NULL),
87
myBlobType(MS_STANDARD_STORAGE),
92
#ifdef HAVE_ALIAS_SUPPORT
106
MSDatabase::~MSDatabase()
110
iBackupThread->stop();
111
iBackupThread->release();
112
iBackupThread = NULL;
115
if (myTempLogThread) {
116
myTempLogThread->stop();
117
myTempLogThread->release();
118
myTempLogThread = NULL;
120
if (myCompactorThread) {
121
myRepostoryList->wakeup(); // The compator thread waits on this.
122
myCompactorThread->stop();
123
myCompactorThread->release();
124
myCompactorThread = NULL;
128
myDatabasePath->release();
131
myDatabaseName->release();
133
iWriteTempLog = NULL;
134
if (myTempLogArray) {
135
myTempLogArray->clear();
136
myTempLogArray->release();
140
iTableList->release();
143
iTableArray->clear();
144
iTableArray->release();
146
if (myRepostoryList) {
147
myRepostoryList->clear();
148
myRepostoryList->release();
150
#ifdef HAVE_ALIAS_SUPPORT
152
iBlobAliases->ma_close();
153
iBlobAliases->release();
157
myBlobCloud->release();
161
uint32_t MSDatabase::fileToTableId(const char *file_name, const char *name_part)
166
const char *num = file_name + strlen(file_name) - 1;
168
while (num >= file_name && *num != '-')
171
/* Check the name part of the file: */
172
int len = strlen(name_part);
174
if (len != num - file_name)
176
if (strncmp(file_name, name_part, len) != 0)
181
sscanf(num, "%"PRIu32"", &value);
186
const char *MSDatabase::fileToTableName(size_t size, char *tab_name, const char *file_name)
191
file_name = cs_last_name_of_path(file_name);
192
cptr = file_name + strlen(file_name) - 1;
193
while (cptr > file_name && *cptr != '.')
195
if (cptr > file_name && *cptr == '.') {
196
if (strncmp(cptr, ".bs", 2) == 0) {
198
while (cptr > file_name && isdigit(*cptr))
203
len = cptr - file_name;
207
memcpy(tab_name, file_name, len);
210
/* Return a pointer to what was removed! */
211
return file_name + len;
215
const char *MSDatabase::getDatabaseNameCString()
217
return myDatabaseName->getCString();
220
MSTable *MSDatabase::getTable(CSString *tab_name, bool create)
227
if (!(tab = (MSTable *) iTableList->find(tab_name))) {
230
/* Create a new table: */
231
tab = MSTable::newTable(iMaxTableID+1, RETAIN(tab_name), this, (off64_t) 0, false);
232
iTableList->add(tab);
233
iTableArray->set(iMaxTableID+1, RETAIN(tab));
244
MSTable *MSDatabase::getTable(const char *tab_name, bool create)
246
return getTable(CSString::newString(tab_name), create);
250
MSTable *MSDatabase::getTable(uint32_t tab_id, bool missing_ok)
256
if (!(tab = (MSTable *) iTableArray->get((uint32_t) tab_id))) {
261
char buffer[CS_EXC_MESSAGE_SIZE];
263
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown table #");
264
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) tab_id);
265
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, " in database ");
266
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, getDatabaseNameCString());
267
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_TABLE, buffer);
274
MSTable *MSDatabase::getNextTable(uint32_t *pos)
281
while (i < iTableList->getSize()) {
282
tab = (MSTable *) iTableList->itemAt(i++);
283
if (!tab->isToDelete())
294
void MSDatabase::addTable(uint32_t tab_id, const char *tab_name, off64_t file_size, bool to_delete)
298
if (tab_id > iMaxTableID)
299
iMaxTableID = tab_id;
300
tab = MSTable::newTable(tab_id, tab_name, this, file_size, to_delete);
301
iTableList->add(tab);
302
iTableArray->set(tab_id, RETAIN(tab));
305
void MSDatabase::addTableFromFile(CSDirectory *dir, const char *file_name, bool to_delete)
309
char tab_name[MS_TABLE_NAME_SIZE];
311
dir->info(NULL, &file_size, NULL);
312
file_id = fileToTableId(file_name);
313
fileToTableName(MS_TABLE_NAME_SIZE, tab_name, file_name);
314
addTable(file_id, tab_name, file_size, to_delete);
317
void MSDatabase::removeTable(MSTable *tab)
322
iTableList->remove(tab->myTableName);
323
iTableArray->remove(tab->myTableID);
329
void MSDatabase::dropTable(MSTable *tab)
334
iTableList->remove(tab->myTableName);
335
iTableArray->remove(tab->myTableID);
337
// Cute: you drop the table by adding it with the 'to_delete' flag set to 'true'
338
addTable(tab->myTableID, tab->myTableName->getCString(), tab->getTableFileSize(), true);
345
// This function is used when dropping tables from a database before
346
// dropping the database itself.
347
CSString *MSDatabase::getATableName()
351
CSString *name = NULL;
356
while ((tab = (MSTable *) iTableList->itemAt(i++)) && tab->isToDelete()) ;
358
name = tab->getTableName();
365
uint32_t MSDatabase::getTableCount()
367
uint32_t cnt = 0, i = 0;
373
while ((tab = (MSTable *) iTableList->itemAt(i++))) {
374
if (!tab->isToDelete())
383
void MSDatabase::renameTable(MSTable *tab, const char *to_name)
387
iTableList->remove(tab->myTableName);
388
iTableArray->remove(tab->myTableID);
390
addTable(tab->myTableID, to_name, tab->getTableFileSize(), false);
396
void MSDatabase::openWriteRepo(MSOpenTable *otab)
398
if (otab->myWriteRepo && otab->myWriteRepoFile)
402
if (!otab->myWriteRepo)
403
otab->myWriteRepo = lockRepo(0);
405
/* Now open the repo file for the open table: */
406
otab->myWriteRepo->openRepoFileForWriting(otab);
410
MSRepository *MSDatabase::getRepoFullOfTrash(time_t *ret_wait_time)
412
MSRepository *repo = NULL;
413
time_t wait_time = 0;
416
wait_time = *ret_wait_time;
418
lock_(myRepostoryList);
419
for (uint32_t i=0; i<myRepostoryList->size(); i++) {
421
if ((repo = (MSRepository *) myRepostoryList->get(i))) {
422
if (!repo->isRemovingFP && !repo->mustBeDeleted && !repo->isRepoLocked()) {
423
if (!repo->myRepoHeadSize) {
424
/* The file has not yet been opened, so the
425
* garbage count will not be known!
427
MSRepoFile *repo_file;
430
unlock_(myRepostoryList);
432
repo_file = repo->openRepoFile();
433
repo_file->release();
435
lock_(myRepostoryList);
438
if (repo->getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
439
/* Make sure there are not temp BLOBs in this repository that have
442
time_t now = time(NULL);
443
time_t then = repo->myLastTempTime;
445
/* Check if there are any temp BLOBs to be removed: */
446
if (now > (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
447
repo->lockRepo(REPO_COMPACTING);
452
/* There are temp BLOBs to wait for... */
453
if (!wait_time || wait_time > MSTempLog::adjustWaitTime(then, now))
454
wait_time = MSTempLog::adjustWaitTime(then, now);
461
unlock_(myRepostoryList);
463
*ret_wait_time = wait_time;
467
MSRepository *MSDatabase::lockRepo(off64_t size)
473
lock_(myRepostoryList);
474
free_slot = myRepostoryList->size();
475
/* Find an unlocked repository file that is below the write threshold: */
476
for (uint32_t i=0; i<myRepostoryList->size(); i++) {
477
if ((repo = (MSRepository *) myRepostoryList->get(i))) {
478
if ((!repo->isRepoLocked()) && (!repo->isRemovingFP) && (!repo->mustBeDeleted) &&
479
((repo->myRepoFileSize + size) < PBMSParameters::getRepoThreshold())
480
/**/ && (repo->getGarbageLevel() < PBMSParameters::getGarbageThreshold()))
489
/* None found, create a new repo file: */
490
new_(repo, MSRepository(free_slot + 1, this, 0));
491
myRepostoryList->set(free_slot, repo);
495
repo->lockRepo(REPO_WRITE); // <- The MSRepository::backToPool() will unlock this.
496
unlock_(myRepostoryList);
500
MSRepoFile *MSDatabase::getRepoFileFromPool(uint32_t repo_id, bool missing_ok)
506
lock_(myRepostoryList);
507
if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1))) {
509
char buffer[CS_EXC_MESSAGE_SIZE];
511
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown repository file: ");
512
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
513
CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, buffer);
515
unlock_(myRepostoryList);
518
if (repo->isRemovingFP) {
519
char buffer[CS_EXC_MESSAGE_SIZE];
521
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Repository will be removed: ");
522
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
523
CSException::throwException(CS_CONTEXT, MS_ERR_REMOVING_REPO, buffer);
525
repo->retain(); /* Release is here: [++] */
526
file = repo->getRepoFile();
527
unlock_(myRepostoryList);
530
file = repo->openRepoFile();
531
lock_(myRepostoryList);
532
repo->addRepoFile(file);
534
unlock_(myRepostoryList);
539
void MSDatabase::returnRepoFileToPool(MSRepoFile *file)
544
lock_(myRepostoryList);
546
if ((repo = file->myRepo)) {
547
if (repo->isRemovingFP) {
548
repo->removeRepoFile(RETAIN(file));
549
myRepostoryList->wakeup();
552
repo->returnRepoFile(RETAIN(file));
553
repo->release(); /* [++] here is the release. */
556
unlock_(myRepostoryList);
560
void MSDatabase::removeRepo(uint32_t repo_id, bool *mustQuit)
565
lock_(myRepostoryList);
566
while ((!mustQuit || !*mustQuit) && !iClosing) {
567
if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1)))
569
repo->isRemovingFP = true;
570
if (repo->removeRepoFilesNotInUse()) {
571
myRepostoryList->set(repo_id - 1, NULL);
575
* Wait for the files that are in use to be
578
myRepostoryList->wait();
580
unlock_(myRepostoryList);
584
void MSDatabase::queueTempLogEvent(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
585
uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time)
587
MSTempLogItemRec item;
590
// Each otab object holds an handle to an instance of an OPEN
591
// temp log. This is so that each thread has it's own open temp log
592
// and doesn't need to be opened and close it constantly.
595
lock_(myTempLogArray);
596
if (!iWriteTempLog) {
597
iWriteTempLog = (MSTempLog *) myTempLogArray->last();
598
if (!iWriteTempLog) {
599
new_(iWriteTempLog, MSTempLog(1, this, 0));
600
myTempLogArray->set(1, iWriteTempLog);
603
if (!otab->myTempLogFile)
604
otab->myTempLogFile = iWriteTempLog->openTempLog();
605
else if (otab->myTempLogFile->myTempLogID != iWriteTempLog->myLogID) {
606
otab->myTempLogFile->release();
607
otab->myTempLogFile = NULL;
608
otab->myTempLogFile = iWriteTempLog->openTempLog();
611
if (iWriteTempLog->myTempLogSize >= PBMSParameters::getTempLogThreshold()) {
612
uint32_t tmp_log_id = iWriteTempLog->myLogID + 1;
614
new_(iWriteTempLog, MSTempLog(tmp_log_id, this, 0));
615
myTempLogArray->set(tmp_log_id, iWriteTempLog);
617
otab->myTempLogFile->release();
618
otab->myTempLogFile = NULL;
619
otab->myTempLogFile = iWriteTempLog->openTempLog();
624
*log_id = iWriteTempLog->myLogID;
625
*log_offset = (uint32_t) iWriteTempLog->myTempLogSize;
628
iWriteTempLog->myTempLogSize += iWriteTempLog->myTemplogRecSize;
629
unlock_(myTempLogArray);
631
CS_SET_DISK_1(item.ti_type_1, type);
632
CS_SET_DISK_4(item.ti_table_id_4, tab_id);
633
CS_SET_DISK_6(item.ti_blob_id_6, blob_id);
634
CS_SET_DISK_4(item.ti_auth_code_4, auth_code);
635
CS_SET_DISK_4(item.ti_time_4, timev);
636
otab->myTempLogFile->write(&item, *log_offset, sizeof(MSTempLogItemRec));
642
#ifdef HAVE_ALIAS_SUPPORT
643
void MSDatabase::queueForDeletion(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
644
uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time, MSDiskAliasPtr aliasDiskRec)
648
queueTempLogEvent(otab, type, tab_id, blob_id, auth_code, log_id, log_offset, q_time);
650
// If it has an alias remove it from the ailias index.
653
deleteBlobAlias(aliasDiskRec);
656
self->logException();
664
MSTempLogFile *MSDatabase::openTempLogFile(uint32_t log_id, size_t *log_rec_size, size_t *log_head_size)
667
MSTempLogFile *log_file = NULL;
670
lock_(myTempLogArray);
672
log = (MSTempLog *) myTempLogArray->get(log_id);
674
log = (MSTempLog *) myTempLogArray->first();
676
log_file = log->openTempLog();
678
*log_rec_size = log->myTemplogRecSize;
680
*log_head_size = log->myTempLogHeadSize;
682
unlock_(myTempLogArray);
686
uint32_t MSDatabase::getTempLogCount()
691
lock_(myTempLogArray);
692
count = myTempLogArray->size();
693
unlock_(myTempLogArray);
697
void MSDatabase::removeTempLog(uint32_t log_id)
700
lock_(myTempLogArray);
701
myTempLogArray->remove(log_id);
702
unlock_(myTempLogArray);
706
CSObject *MSDatabase::getKey()
708
return (CSObject *) myDatabaseName;
711
int MSDatabase::compareKey(CSObject *key)
713
return myDatabaseName->compare((CSString *) key);
716
MSCompactorThread *MSDatabase::getCompactorThread()
718
return myCompactorThread;
721
CSSyncVector *MSDatabase::getRepositoryList()
723
return myRepostoryList;
726
#ifdef HAVE_ALIAS_SUPPORT
727
uint32_t MSDatabase::registerBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
730
bool can_retry = true;
734
lock_(&iBlobAliaseLock);
737
hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
741
unlock_(&iBlobAliaseLock);
743
// It can be that a duplicater alias exists that was deleted
744
// but the transaction has not been written to the repository yet.
745
// Flush all committed transactions to the repository file.
746
MSTransactionManager::flush();
754
unlock_(&iBlobAliaseLock);
758
uint32_t MSDatabase::updateBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t old_alias_hash, const char *alias)
762
lock_(&iBlobAliaseLock);
764
new_hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
765
iBlobAliases->deleteAlias(repo_id, repo_offset, old_alias_hash);
767
unlock_(&iBlobAliaseLock);
771
void MSDatabase::deleteBlobAlias(MSDiskAliasPtr diskRec)
774
lock_(&iBlobAliaseLock);
775
iBlobAliases->deleteAlias(diskRec);
776
unlock_(&iBlobAliaseLock);
780
void MSDatabase::deleteBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
782
MSDiskAliasRec diskRec;
784
CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);
785
CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);
786
CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);
787
deleteBlobAlias(&diskRec);
790
void MSDatabase::moveBlobAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
793
lock_(&iBlobAliaseLock);
794
iBlobAliases->resetAlias(old_repo_id, old_repo_offset, alias_hash, new_repo_id, new_repo_offset);
795
unlock_(&iBlobAliaseLock);
800
bool MSDatabase::isValidHeaderField(const char *name)
802
bool is_valid = false;
807
if (strcasecmp(name, MS_ALIAS_TAG)) {
808
lock_(&iHTTPMetaDataHeaders);
809
header = CSString::newString(name);
812
is_valid = (iHTTPMetaDataHeaders.find(header) != NULL);
815
unlock_(&iHTTPMetaDataHeaders);
823
void MSDatabase::startUp(const char *default_http_headers)
827
new_(gDatabaseList, CSSyncSortedList);
828
new_(gDatabaseArray, CSSparseArray(5));
829
MSHTTPHeaderTable::setDefaultMetaDataHeaders(default_http_headers);
830
PBMSSystemTables::systemTablesStartUp();
831
PBMSParameters::setBackupDatabaseID(1);
835
void MSDatabase::stopThreads()
841
lock_(gDatabaseList);
843
if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
847
if (db->myTempLogThread) {
848
db->myTempLogThread->stop();
849
db->myTempLogThread->release();
850
db->myTempLogThread = NULL;
852
if (db->myCompactorThread) {
853
db->myRepostoryList->wakeup(); // The compator thread waits on this.
854
db->myCompactorThread->stop();
855
db->myCompactorThread->release();
856
db->myCompactorThread = NULL;
859
if (db->iBackupThread) {
860
db->iBackupThread->stop();
861
db->iBackupThread->release();
862
db->iBackupThread = NULL;
867
unlock_(gDatabaseList);
872
void MSDatabase::shutDown()
875
if (gDatabaseArray) {
876
gDatabaseArray->clear();
877
gDatabaseArray->release();
878
gDatabaseArray = NULL;
882
gDatabaseList->clear();
883
gDatabaseList->release();
884
gDatabaseList = NULL;
887
MSHTTPHeaderTable::releaseDefaultMetaDataHeaders();
888
PBMSSystemTables::systemTableShutDown();
891
void MSDatabase::setBackupDatabase()
894
// I need to give the backup database a unique fake database ID.
895
// This is so that it is not confused with the database being backed
896
// backed up when opening tables.
898
// Normally database IDs are generated by time(NULL) so small database IDs
899
// are safe to use as fake IDs.
901
lock_(gDatabaseList);
902
myDatabaseID = PBMSParameters::getBackupDatabaseID() +1;
903
PBMSParameters::setBackupDatabaseID(myDatabaseID);
904
gDatabaseArray->set(myDatabaseID, RETAIN(this));
907
// Notify the cloud storage, if any, that it is a backup.
908
// This is important because if the backup database is dropped
909
// we need to be sure that only the BLOBs belonging to the
910
// backup are removed from the cloud.
911
myBlobCloud->cl_setCloudIsBackup();
913
unlock_(gDatabaseList);
915
// Rename the database path so that it is obviouse that this is an incomplete backup database.
916
// When the backup is completed it will be renamed back.
917
CSPath *new_path = CSPath::newPath(myDatabasePath->concat("#"));
920
if (new_path->exists())
923
CSPath *db_path = CSPath::newPath(RETAIN(myDatabasePath));
926
db_path->rename(new_path->getNameCString());
927
myDatabasePath->release();
928
myDatabasePath = new_path->getString();
929
myDatabasePath->retain();
938
void MSDatabase::releaseBackupDatabase()
943
// The backup has completed succefully, rename the path to the correct name.
944
CSPath *db_path = CSPath::newPath(myDatabasePath->getCString());
947
myDatabasePath->setLength(myDatabasePath->length()-1);
948
db_path->rename(cs_last_name_of_path(myDatabasePath->getCString()));
951
// Remove the backup database object.
952
lock_(gDatabaseList);
953
gDatabaseArray->remove(myDatabaseID);
954
MSTableList::removeDatabaseTables(this); // Will also release the database object.
955
unlock_(gDatabaseList);
961
void MSDatabase::startBackup(MSBackupInfo *backup_info)
967
if (iBackupThread->isRunning()) {
968
CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, "A backup is still running.");
970
iBackupThread->release();
971
iBackupThread = NULL;
975
iBackupThread = MSBackup::newMSBackup(backup_info);
978
iBackupThread->startBackup(RETAIN(this));
982
iBackupThread->release();
983
iBackupThread = NULL;
991
bool MSDatabase::backupStatus(uint64_t *total, uint64_t *completed, bool *completed_ok)
998
*total = iBackupThread->getBackupSize();
999
*completed = iBackupThread->getBackupCompletedSize();
1000
done = !iBackupThread->isRunning();
1001
*completed = (iBackupThread->getStatus() == 0);
1003
*completed_ok = done = true;
1004
*total = *completed = 0;
1010
uint32_t MSDatabase::backupID()
1012
return (iBackupThread)?iBackupThread->backupID(): 0;
1015
void MSDatabase::terminateBackup()
1017
if (iBackupThread) {
1018
iBackupThread->stop();
1019
iBackupThread->release();
1020
iBackupThread = NULL;
1024
MSDatabase *MSDatabase::getDatabase(CSString *db_name, bool create)
1031
lock_(gDatabaseList);
1032
if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
1033
db = MSDatabase::loadDatabase(RETAIN(db_name), create);
1040
unlock_(gDatabaseList);
1045
MSDatabase *MSDatabase::getDatabase(const char *db_name, bool create)
1047
return getDatabase(CSString::newString(db_name), create);
1050
MSDatabase *MSDatabase::getDatabase(uint32_t db_id)
1055
lock_(gDatabaseList);
1056
if ((db = (MSDatabase *) gDatabaseArray->get((uint32_t) db_id)))
1059
// Look for the database folder with the correct ID:
1060
CSPath *path = CSPath::newPath(PBMSDaemon::getPBMSDir());
1062
if (path->exists()) {
1064
dir = CSDirectory::newDirectory(RETAIN(path));
1068
while (dir->next() && !db) {
1069
if (!dir->isFile()) {
1070
const char *ptr, *dir_name = dir->name();
1071
ptr = dir_name + strlen(dir_name) -1;
1073
while (ptr > dir_name && *ptr != '-') ptr--;
1076
int len = ptr - dir_name;
1078
if ((strtoul(ptr, NULL, 10) == db_id) && len) {
1079
db = getDatabase(CSString::newString(dir_name, len), true);
1080
ASSERT(db->myDatabaseID == db_id);
1089
unlock_(gDatabaseList);
1092
char buffer[CS_EXC_MESSAGE_SIZE];
1094
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database #");
1095
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) db_id);
1096
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
1101
void MSDatabase::wakeTempLogThreads()
1108
lock_(gDatabaseList);
1109
for (int i=0;;i++) {
1110
if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
1112
if (db->myTempLogThread)
1113
db->myTempLogThread->wakeup();
1115
unlock_(gDatabaseList);
1119
uint32_t MSDatabase::getDBID(CSPath *path, CSString *db_name)
1123
int len = db_name->length();
1130
// Search for the ID of the database
1131
dir = CSDirectory::newDirectory(RETAIN(path));
1134
while (dir->next() && !db_id)
1136
if (!dir->isFile()){
1137
ptr = dir->name() + strlen(dir->name()) -1;
1138
while (ptr > dir->name() && isdigit(*ptr)) ptr--;
1139
if ((*ptr == '-') && (len == (ptr - dir->name())) && !db_name->compare(dir->name(), len) ) {
1140
db_id = atol(ptr+1);
1149
while (1) { // search for a unique db_id
1150
dir = CSDirectory::newDirectory(RETAIN(path));
1153
while (db_id && dir->next()) {
1154
if (!dir->isFile()) {
1155
ptr = dir->name() + strlen(dir->name()) -1;
1156
while (ptr > dir->name() && isdigit(*ptr)) ptr--;
1157
if ((*ptr == '-') && (db_id == strtoul(ptr+1, NULL, 10))) {
1165
sleep(1); // Allow 1 second to pass.
1175
CSPath *MSDatabase::createDatabasePath(const char *location, CSString *db_name, uint32_t *db_id_ptr, bool *create, bool is_pbms)
1177
bool create_path = *create;
1178
CSPath *path = NULL;
1179
char name_buffer[MS_DATABASE_NAME_SIZE + 40];
1185
path = CSPath::newPath(location, "pbms");
1187
if (!path->exists()) {
1198
// If this is the pbms database then nothing more is to be done.
1202
if ((!db_id_ptr) || !*db_id_ptr) {
1203
db_id = getDBID(RETAIN(path), RETAIN(db_name));
1209
// Create the PBMS database name with ID
1210
cs_strcpy(MS_DATABASE_NAME_SIZE + 40, name_buffer, db_name->getCString());
1211
cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, "-");
1212
cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, (uint32_t) db_id);
1215
path = CSPath::newPath(path, name_buffer);
1217
if (!path->exists()) {
1237
MSDatabase *MSDatabase::newDatabase(const char *db_location, CSString *db_name, uint32_t db_id, bool create)
1239
MSDatabase *db = NULL;
1243
const char *file_name;
1247
uint32_t to_delete = 0;
1249
bool is_pbms = false;
1255
//is_pbms = (strcmp(db_name->getCString(), "pbms") == 0); To be done later.
1258
* Block the creation of the pbms database if there is no MySQL database.
1259
* The database name is case sensitive here if the file system names are
1260
* case sensitive. This is desirable.
1262
path = CSPath::newPath(ms_my_get_mysql_home_path(), RETAIN(db_name));
1264
if (create && !path->exists()) {
1265
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
1269
// Create the database path, if 'create' == false then it can return NULL
1270
path = createDatabasePath(db_location, RETAIN(db_name), &db_id, &create, is_pbms);
1277
// Create the database object and initialize it.
1278
if (!(db = new MSDatabase())) {
1279
CSException::throwOSError(CS_CONTEXT, ENOMEM);
1282
db->myIsPBMS = is_pbms;
1283
db_path = path->getString();
1288
db->iNextBlobRefId = (uint32_t) time(NULL);
1289
db->iNextBlobRefId <<= 32;
1290
db->iNextBlobRefId = COMMIT_MASK(db->iNextBlobRefId);
1291
db->iNextBlobRefId++;
1293
db->myDatabaseID = db_id;
1294
db->myDatabasePath = db_path;
1295
db->myDatabaseName = db_name;
1296
new_(db->myBlobCloud, CloudDB(db_id));
1303
new_(db->myTempLogArray, CSSyncSparseArray(20));
1304
new_(db->iTableList, CSSyncSortedList());
1305
new_(db->iTableArray, CSSparseArray(20));
1306
new_(db->myRepostoryList, CSSyncVector(20));
1309
#ifdef HAVE_ALIAS_SUPPORT
1310
//db->retain(); no retain here, MSAlias() takes a back ref.
1311
new_(db->iBlobAliases, MSAlias(db));
1313
/* "Load" the database: */
1315
/* Get the max table ID: */
1316
dir = CSDirectory::newDirectory(RETAIN(db_path));
1319
while (dir->next()) {
1320
file_name = dir->name();
1321
if (dir->isFile() && cs_is_extension(file_name, "bst"))
1322
db->addTableFromFile(dir, file_name, false);
1326
path = CSPath::newPath(RETAIN(db_path), "bs-repository");
1327
if (path->exists()) {
1328
dir = CSDirectory::newDirectory(path);
1331
while (dir->next()) {
1332
file_name = dir->name();
1333
if (dir->isFile() && cs_is_extension(file_name, "bs")) {
1334
if ((file_id = fileToTableId(file_name, "repo"))) {
1335
dir->info(NULL, &file_size, NULL);
1336
new_(repo, MSRepository(file_id, db, file_size));
1337
db->myRepostoryList->set(file_id - 1, repo);
1348
path = CSPath::newPath(RETAIN(db_path), "bs-logs");
1349
if (path->exists()) {
1350
dir = CSDirectory::newDirectory(path);
1353
while (dir->next()) {
1354
file_name = dir->name();
1355
if (dir->isFile()) {
1356
if (cs_is_extension(file_name, "bs")) {
1357
if ((file_id = fileToTableId(file_name, "temp"))) {
1358
dir->info(NULL, &file_size, NULL);
1359
new_(log, MSTempLog(file_id, db, file_size));
1360
db->myTempLogArray->set(file_id, log);
1363
else if (cs_is_extension(file_name, "bst")) {
1364
db->addTableFromFile(dir, file_name, true);
1377
/* Go through and prepare all the tables that are to
1383
while ((tab = (MSTable *) db->iTableList->itemAt(i))) {
1384
if (tab->isToDelete())
1385
tab->prepareToDelete();
1396
void MSDatabase::startThreads()
1403
#ifdef HAVE_ALIAS_SUPPORT
1404
// iBlobAliases->ma_open() must be called before starting any threads.
1405
iBlobAliases->ma_open();
1408
new_(myTempLogThread, MSTempLogThread(1 * 1000, this));
1409
myTempLogThread->start();
1411
#ifdef MS_COMPACTOR_POLLS
1412
new_(myCompactorThread, MSCompactorThread(MS_COMPACTOR_POLL_FREQ, this));
1414
new_(myCompactorThread, MSCompactorThread(MS_DEFAULT_COMPACTOR_WAIT * 1000, this));
1418
myCompactorThread->start();
1423
void MSDatabase::dropDatabase()
1430
if (iBackupThread) {
1431
iBackupThread->stop();
1432
iBackupThread->release();
1433
iBackupThread = NULL;
1436
if (myTempLogThread) {
1437
myTempLogThread->stop();
1438
myTempLogThread->release();
1439
myTempLogThread = NULL;
1442
if (myCompactorThread) {
1443
myRepostoryList->wakeup(); // The compator thread waits on this.
1444
myCompactorThread->stop();
1445
myCompactorThread->release();
1446
myCompactorThread = NULL;
1449
// Call cloud drop database even if the database is not currrently
1450
// using cloud storage just in case to was in the past. If the connection
1451
// to the cloud is not setup then nothing will be done.
1453
myBlobCloud->cl_dropDB();
1456
self->logException();
1462
void MSDatabase::removeDatabasePath(CSString *doomedDatabasePath )
1464
CSPath *path = NULL;
1465
CSDirectory *dir = NULL;
1466
const char *file_name;
1469
push_(doomedDatabasePath);
1471
// Delete repository files
1472
path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-repository");
1474
if (path->exists()) {
1475
dir = CSDirectory::newDirectory(RETAIN(path));
1478
while (dir->next()) {
1479
file_name = dir->name();
1480
if (dir->isFile() && cs_is_extension(file_name, "bs")) {
1485
if (path->isEmpty())
1490
// Delete temp log files.
1491
path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-logs");
1493
if (path->exists()) {
1494
dir = CSDirectory::newDirectory(RETAIN(path));
1497
while (dir->next()) {
1498
file_name = dir->name();
1499
if (dir->isFile() && (cs_is_extension(file_name, "bs") || cs_is_extension(file_name, "bst"))) {
1504
if (path->isEmpty())
1509
// Delete table reference files.
1510
dir = CSDirectory::newDirectory(RETAIN(doomedDatabasePath));
1513
while (dir->next()) {
1514
file_name = dir->name();
1515
if (dir->isFile() && cs_is_extension(file_name, "bst"))
1520
#ifdef HAVE_ALIAS_SUPPORT
1521
path = CSPath::newPath(RETAIN(doomedDatabasePath), ACTIVE_ALIAS_INDEX);
1527
PBMSSystemTables::removeSystemTables(RETAIN(doomedDatabasePath));
1529
path = CSPath::newPath(RETAIN(doomedDatabasePath));
1531
if (path->isEmpty() && !path->isLink()) {
1534
CSStringBuffer *new_name;
1535
// If the database folder is not empty we rename it to get it out of the way.
1536
// If it is not renamed it will be reused if a database with the same name is
1537
// created again wich will result in the database ID being reused which may
1538
// have some bad side effects.
1539
new_(new_name, CSStringBuffer());
1541
new_name->append(cs_last_name_of_path(doomedDatabasePath->getCString()));
1542
new_name->append("_DROPPED");
1543
path->rename(new_name->getCString());
1548
release_(doomedDatabasePath);
1550
path = CSPath::newPath(PBMSDaemon::getPBMSDir());
1552
if (path->isEmpty() && !path->isLink()) {
1560
/* Drop the PBMS database if it exists.
1561
* The root folder 'pbms' will be deleted also
1562
* if it is empty and not a symbolic link.
1563
* The database folder in 'pbms' is deleted if it is empty and
1564
* it is not a symbolic link.
1566
void MSDatabase::dropDatabase(MSDatabase *doomedDatabase, const char *db_name )
1568
CSString *doomedDatabasePath = NULL;
1572
if (doomedDatabase) {
1573
push_(doomedDatabase);
1575
// Remove any pending transactions for the dropped database.
1576
// This is important because if the database is restored it will have the
1577
// same database ID and the old transactions would be applied to it.
1578
MSTransactionManager::dropDatabase(doomedDatabase->myDatabaseID);
1580
doomedDatabasePath = doomedDatabase->myDatabasePath;
1581
doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
1583
MSTableList::removeDatabaseTables(RETAIN(doomedDatabase));
1584
MSSystemTableShare::removeDatabaseSystemTables(RETAIN(doomedDatabase));
1586
doomedDatabase->dropDatabase(); // Shutdown database threads.
1588
// To avoid a deadlock a lock is not taken on the database list
1589
// if shutdown is in progress. The only database that would be
1590
// dropped during a shutdown is an incomplete backup database.
1591
ASSERT(doomedDatabase->isBackup || !self->myMustQuit);
1592
if (!self->myMustQuit)
1593
lock_(gDatabaseList); // Be sure to shutdown the database before locking this or it can lead to deadlocks
1595
gDatabaseArray->remove(doomedDatabase->myDatabaseID);
1596
if (!doomedDatabase->isBackup)
1597
gDatabaseList->remove(doomedDatabase->getKey());
1598
if (!self->myMustQuit)
1599
unlock_(gDatabaseList);
1600
ASSERT(doomedDatabase->getRefCount() == 1);
1601
release_(doomedDatabase);
1605
bool create = false;
1608
path = createDatabasePath(ms_my_get_mysql_home_path(), CSString::newString(db_name), &db_id, &create);
1611
MSTransactionManager::dropDatabase(db_id);
1614
doomedDatabasePath = path->getString();
1615
doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
1620
if (doomedDatabasePath)
1621
removeDatabasePath(doomedDatabasePath);
1626
void MSDatabase::dropDatabase(const char *db_name )
1629
dropDatabase(getDatabase(db_name, false), db_name);
1633
// The table_path can be several things here:
1634
// 1: <absalute path>/<database>/<table>
1635
// 2: <absalute path>/<database>
1636
// 3: <database>/<table>
1637
bool MSDatabase::convertTablePathToIDs(const char *table_path, uint32_t *db_id, uint32_t *tab_id, bool create)
1639
const char *base = ms_my_get_mysql_home_path();
1640
CSString *table_url;
1641
CSString *db_path = NULL;
1642
CSString *db_name = NULL;
1643
CSString *tab_name = NULL;
1650
table_url = CSString::newString(table_path);
1651
if (table_url->startsWith(base)) {
1652
table_url = table_url->right(base);
1657
db_path = table_url->left("/", -1);
1659
tab_name = table_url->right("/", -1);
1662
release_(table_url);
1664
if (db_path->length() == 0) { // Only a database name was supplied.
1669
if (tab_name->length() == 0) {
1670
tab_name->release();
1675
db_name = db_path->right("/", -1);
1677
if (db_name->length() == 0) {
1686
db = MSDatabase::getDatabase(db_name, create); // This will release db_name
1688
*db_id = db->myDatabaseID;
1693
tab = db->getTable(tab_name, create);// This will release tab_name
1696
*tab_id = tab->myTableID;
1704
return_((*tab_id > 0) && (*db_id > 0));
1707
bool MSDatabase::convertTableAndDatabaseToIDs(const char *db_name, const char *tab_name, uint32_t *db_id, uint32_t *tab_id, bool create)
1715
db = MSDatabase::getDatabase(db_name, create);
1718
*db_id = db->myDatabaseID;
1721
tab = db->getTable(tab_name, create);
1723
*tab_id = tab->myTableID;
1731
return_((*tab_id > 0) && (*db_id > 0));
1734
MSDatabase *MSDatabase::loadDatabase(CSString *db_name, bool create)
1739
db = newDatabase(ms_my_get_mysql_home_path(), db_name, 0, create);
1744
gDatabaseList->add(RETAIN(db));
1746
gDatabaseArray->set(db->myDatabaseID, RETAIN(db));
1748
PBMSSystemTables::loadSystemTables(RETAIN(db));
1755
uint32_t MSDatabase::getDatabaseID(CSString *db_name, bool create)
1763
lock_(gDatabaseList);
1764
if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
1765
db = MSDatabase::loadDatabase(RETAIN(db_name), create);
1768
id = db->myDatabaseID;
1771
id = db->myDatabaseID;
1774
unlock_(gDatabaseList);
1780
uint32_t MSDatabase::getDatabaseID(const char *db_name, bool create)
1782
return getDatabaseID(CSString::newString(db_name), create);
1785
MSDatabase *MSDatabase::getBackupDatabase(CSString *db_location, CSString *db_name, uint32_t db_id, bool create)
1787
bool was_created = create;
1794
// If the db already exists and 'create' == true then the existing db
1796
// Create the database path, if 'create' == false then it can return NULL
1797
path = createDatabasePath(db_location->getCString(), RETAIN(db_name), &db_id, &was_created);
1799
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
1803
// If we wanted to create it but it already exists then throw an error.
1804
if ( create && !was_created) {
1806
snprintf(str, 120, "Duplicate database: %s", db_name->getCString());
1807
CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, str);
1812
// everything looks OK
1813
db = newDatabase(db_location->getCString(), db_name, db_id, create);
1814
db->setBackupDatabase();
1815
release_(db_location);