1
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
32
#include <drizzled/common.h>
33
#include <drizzled/session.h>
34
#include <drizzled/table.h>
35
#include <drizzled/message/table.pb.h>
36
#include <drizzled/charset.h>
37
#include <drizzled/table_proto.h>
38
#include <drizzled/field.h>
41
#include "cslib/CSConfig.h"
49
#include "cslib/CSGlobal.h"
50
#include "cslib/CSLog.h"
51
#include "cslib/CSDirectory.h"
52
#include "cslib/CSStrUtil.h"
54
#include "database_ms.h"
55
#include "open_table_ms.h"
56
#include "backup_ms.h"
58
#include "temp_log_ms.h"
59
#include "network_ms.h"
62
#include "transaction_ms.h"
63
//#include "systab_variable_ms.h"
64
#include "systab_httpheader_ms.h"
65
#include "parameters_ms.h"
66
#include "pbmsdaemon_ms.h"
70
CSSyncSortedList *MSDatabase::gDatabaseList;
71
CSSparseArray *MSDatabase::gDatabaseArray;
73
* -------------------------------------------------------------------------
77
MSDatabase::MSDatabase():
83
myCompactorThread(NULL),
84
myTempLogThread(NULL),
85
myRepostoryList(NULL),
87
myBlobType(MS_STANDARD_STORAGE),
92
#ifdef HAVE_ALIAS_SUPPORT
106
MSDatabase::~MSDatabase()
110
iBackupThread->stop();
111
iBackupThread->release();
112
iBackupThread = NULL;
115
if (myTempLogThread) {
116
myTempLogThread->stop();
117
myTempLogThread->release();
118
myTempLogThread = NULL;
120
if (myCompactorThread) {
121
myRepostoryList->wakeup(); // The compator thread waits on this.
122
myCompactorThread->stop();
123
myCompactorThread->release();
124
myCompactorThread = NULL;
128
myDatabasePath->release();
131
myDatabaseName->release();
133
iWriteTempLog = NULL;
134
if (myTempLogArray) {
135
myTempLogArray->clear();
136
myTempLogArray->release();
140
iTableList->release();
143
iTableArray->clear();
144
iTableArray->release();
146
if (myRepostoryList) {
147
myRepostoryList->clear();
148
myRepostoryList->release();
150
#ifdef HAVE_ALIAS_SUPPORT
152
iBlobAliases->ma_close();
153
iBlobAliases->release();
157
myBlobCloud->release();
161
uint32_t MSDatabase::fileToTableId(const char *file_name, const char *name_part)
166
const char *num = file_name + strlen(file_name) - 1;
168
while (num >= file_name && *num != '-')
171
/* Check the name part of the file: */
172
int len = strlen(name_part);
174
if (len != num - file_name)
176
if (strncmp(file_name, name_part, len) != 0)
181
sscanf(num, "%"PRIu32"", &value);
186
const char *MSDatabase::fileToTableName(size_t size, char *tab_name, const char *file_name)
191
file_name = cs_last_name_of_path(file_name);
192
cptr = file_name + strlen(file_name) - 1;
193
while (cptr > file_name && *cptr != '.')
195
if (cptr > file_name && *cptr == '.') {
196
if (strncmp(cptr, ".bs", 2) == 0) {
198
while (cptr > file_name && isdigit(*cptr))
203
len = cptr - file_name;
207
memcpy(tab_name, file_name, len);
210
/* Return a pointer to what was removed! */
211
return file_name + len;
215
const char *MSDatabase::getDatabaseNameCString()
217
return myDatabaseName->getCString();
220
MSTable *MSDatabase::getTable(CSString *tab_name, bool create)
227
if (!(tab = (MSTable *) iTableList->find(tab_name))) {
230
/* Create a new table: */
231
tab = MSTable::newTable(iMaxTableID+1, RETAIN(tab_name), this, (off64_t) 0, false);
232
iTableList->add(tab);
233
iTableArray->set(iMaxTableID+1, RETAIN(tab));
244
MSTable *MSDatabase::getTable(const char *tab_name, bool create)
246
return getTable(CSString::newString(tab_name), create);
250
MSTable *MSDatabase::getTable(uint32_t tab_id, bool missing_ok)
256
if (!(tab = (MSTable *) iTableArray->get((uint32_t) tab_id))) {
261
char buffer[CS_EXC_MESSAGE_SIZE];
263
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown table #");
264
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) tab_id);
265
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, " in database ");
266
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, getDatabaseNameCString());
267
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_TABLE, buffer);
274
MSTable *MSDatabase::getNextTable(uint32_t *pos)
281
while (i < iTableList->getSize()) {
282
tab = (MSTable *) iTableList->itemAt(i++);
283
if (!tab->isToDelete())
294
void MSDatabase::addTable(uint32_t tab_id, const char *tab_name, off64_t file_size, bool to_delete)
298
if (tab_id > iMaxTableID)
299
iMaxTableID = tab_id;
300
tab = MSTable::newTable(tab_id, tab_name, this, file_size, to_delete);
301
iTableList->add(tab);
302
iTableArray->set(tab_id, RETAIN(tab));
305
void MSDatabase::addTableFromFile(CSDirectory *dir, const char *file_name, bool to_delete)
309
char tab_name[MS_TABLE_NAME_SIZE];
311
dir->info(NULL, &file_size, NULL);
312
file_id = fileToTableId(file_name);
313
fileToTableName(MS_TABLE_NAME_SIZE, tab_name, file_name);
314
addTable(file_id, tab_name, file_size, to_delete);
317
void MSDatabase::removeTable(MSTable *tab)
322
iTableList->remove(tab->myTableName);
323
iTableArray->remove(tab->myTableID);
329
void MSDatabase::dropTable(MSTable *tab)
334
iTableList->remove(tab->myTableName);
335
iTableArray->remove(tab->myTableID);
337
// Cute: you drop the table by adding it with the 'to_delete' flag set to 'true'
338
addTable(tab->myTableID, tab->myTableName->getCString(), tab->getTableFileSize(), true);
345
// This function is used when dropping tables from a database before
346
// dropping the database itself.
347
CSString *MSDatabase::getATableName()
351
CSString *name = NULL;
356
while ((tab = (MSTable *) iTableList->itemAt(i++)) && tab->isToDelete()) ;
358
name = tab->getTableName();
365
uint32_t MSDatabase::getTableCount()
367
uint32_t cnt = 0, i = 0;
373
while ((tab = (MSTable *) iTableList->itemAt(i++))) {
374
if (!tab->isToDelete())
383
void MSDatabase::renameTable(MSTable *tab, const char *to_name)
387
iTableList->remove(tab->myTableName);
388
iTableArray->remove(tab->myTableID);
390
addTable(tab->myTableID, to_name, tab->getTableFileSize(), false);
396
void MSDatabase::openWriteRepo(MSOpenTable *otab)
398
if (otab->myWriteRepo && otab->myWriteRepoFile)
402
if (!otab->myWriteRepo)
403
otab->myWriteRepo = lockRepo(0);
405
/* Now open the repo file for the open table: */
406
otab->myWriteRepo->openRepoFileForWriting(otab);
410
MSRepository *MSDatabase::getRepoFullOfTrash(time_t *ret_wait_time)
412
MSRepository *repo = NULL;
413
time_t wait_time = 0;
416
wait_time = *ret_wait_time;
418
lock_(myRepostoryList);
419
for (uint32_t i=0; i<myRepostoryList->size(); i++) {
421
if ((repo = (MSRepository *) myRepostoryList->get(i))) {
422
if (!repo->isRemovingFP && !repo->mustBeDeleted && !repo->isRepoLocked()) {
423
if (!repo->myRepoHeadSize) {
424
/* The file has not yet been opened, so the
425
* garbage count will not be known!
427
MSRepoFile *repo_file;
430
unlock_(myRepostoryList);
432
repo_file = repo->openRepoFile();
433
repo_file->release();
435
lock_(myRepostoryList);
438
if (repo->getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
439
/* Make sure there are not temp BLOBs in this repository that have
442
time_t now = time(NULL);
443
time_t then = repo->myLastTempTime;
445
/* Check if there are any temp BLOBs to be removed: */
446
if (now > (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
447
repo->lockRepo(REPO_COMPACTING);
452
/* There are temp BLOBs to wait for... */
453
if (!wait_time || wait_time > MSTempLog::adjustWaitTime(then, now))
454
wait_time = MSTempLog::adjustWaitTime(then, now);
461
unlock_(myRepostoryList);
463
*ret_wait_time = wait_time;
467
MSRepository *MSDatabase::lockRepo(off64_t size)
473
lock_(myRepostoryList);
474
free_slot = myRepostoryList->size();
475
/* Find an unlocked repository file that is below the write threshold: */
476
for (uint32_t i=0; i<myRepostoryList->size(); i++) {
477
if ((repo = (MSRepository *) myRepostoryList->get(i))) {
478
if ((!repo->isRepoLocked()) && (!repo->isRemovingFP) && (!repo->mustBeDeleted) &&
479
((repo->myRepoFileSize + size) < PBMSParameters::getRepoThreshold())
480
/**/ && (repo->getGarbageLevel() < PBMSParameters::getGarbageThreshold()))
489
/* None found, create a new repo file: */
490
new_(repo, MSRepository(free_slot + 1, this, 0));
491
myRepostoryList->set(free_slot, repo);
495
repo->lockRepo(REPO_WRITE); // <- The MSRepository::backToPool() will unlock this.
496
unlock_(myRepostoryList);
500
MSRepoFile *MSDatabase::getRepoFileFromPool(uint32_t repo_id, bool missing_ok)
506
lock_(myRepostoryList);
507
if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1))) {
509
char buffer[CS_EXC_MESSAGE_SIZE];
511
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown repository file: ");
512
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
513
CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, buffer);
515
unlock_(myRepostoryList);
518
if (repo->isRemovingFP) {
519
char buffer[CS_EXC_MESSAGE_SIZE];
521
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Repository will be removed: ");
522
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
523
CSException::throwException(CS_CONTEXT, MS_ERR_REMOVING_REPO, buffer);
525
repo->retain(); /* Release is here: [++] */
526
file = repo->getRepoFile();
527
unlock_(myRepostoryList);
530
file = repo->openRepoFile();
531
lock_(myRepostoryList);
532
repo->addRepoFile(RETAIN(file));
533
unlock_(myRepostoryList);
538
void MSDatabase::returnRepoFileToPool(MSRepoFile *file)
543
lock_(myRepostoryList);
545
if ((repo = file->myRepo)) {
546
if (repo->isRemovingFP) {
547
repo->removeRepoFile(file); // No retain expected
548
myRepostoryList->wakeup();
551
repo->returnRepoFile(file); // No retain expected
552
repo->release(); /* [++] here is the release. */
555
unlock_(myRepostoryList);
559
void MSDatabase::removeRepo(uint32_t repo_id, bool *mustQuit)
564
lock_(myRepostoryList);
565
while ((!mustQuit || !*mustQuit) && !iClosing) {
566
if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1)))
568
repo->isRemovingFP = true;
569
if (repo->removeRepoFilesNotInUse()) {
570
myRepostoryList->set(repo_id - 1, NULL);
574
* Wait for the files that are in use to be
577
myRepostoryList->wait();
579
unlock_(myRepostoryList);
583
void MSDatabase::queueTempLogEvent(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
584
uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time)
586
MSTempLogItemRec item;
589
// Each otab object holds an handle to an instance of an OPEN
590
// temp log. This is so that each thread has it's own open temp log
591
// and doesn't need to be opened and close it constantly.
594
lock_(myTempLogArray);
595
if (!iWriteTempLog) {
596
iWriteTempLog = (MSTempLog *) myTempLogArray->last();
597
if (!iWriteTempLog) {
598
new_(iWriteTempLog, MSTempLog(1, this, 0));
599
myTempLogArray->set(1, iWriteTempLog);
602
if (!otab->myTempLogFile)
603
otab->myTempLogFile = iWriteTempLog->openTempLog();
604
else if (otab->myTempLogFile->myTempLogID != iWriteTempLog->myLogID) {
605
otab->myTempLogFile->release();
606
otab->myTempLogFile = NULL;
607
otab->myTempLogFile = iWriteTempLog->openTempLog();
610
if (iWriteTempLog->myTempLogSize >= PBMSParameters::getTempLogThreshold()) {
611
uint32_t tmp_log_id = iWriteTempLog->myLogID + 1;
613
new_(iWriteTempLog, MSTempLog(tmp_log_id, this, 0));
614
myTempLogArray->set(tmp_log_id, iWriteTempLog);
616
otab->myTempLogFile->release();
617
otab->myTempLogFile = NULL;
618
otab->myTempLogFile = iWriteTempLog->openTempLog();
623
*log_id = iWriteTempLog->myLogID;
624
*log_offset = (uint32_t) iWriteTempLog->myTempLogSize;
627
iWriteTempLog->myTempLogSize += iWriteTempLog->myTemplogRecSize;
628
unlock_(myTempLogArray);
630
CS_SET_DISK_1(item.ti_type_1, type);
631
CS_SET_DISK_4(item.ti_table_id_4, tab_id);
632
CS_SET_DISK_6(item.ti_blob_id_6, blob_id);
633
CS_SET_DISK_4(item.ti_auth_code_4, auth_code);
634
CS_SET_DISK_4(item.ti_time_4, timev);
635
otab->myTempLogFile->write(&item, *log_offset, sizeof(MSTempLogItemRec));
641
#ifdef HAVE_ALIAS_SUPPORT
642
void MSDatabase::queueForDeletion(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
643
uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time, MSDiskAliasPtr aliasDiskRec)
647
queueTempLogEvent(otab, type, tab_id, blob_id, auth_code, log_id, log_offset, q_time);
649
// If it has an alias remove it from the ailias index.
652
deleteBlobAlias(aliasDiskRec);
655
self->logException();
663
MSTempLogFile *MSDatabase::openTempLogFile(uint32_t log_id, size_t *log_rec_size, size_t *log_head_size)
666
MSTempLogFile *log_file = NULL;
669
lock_(myTempLogArray);
671
log = (MSTempLog *) myTempLogArray->get(log_id);
673
log = (MSTempLog *) myTempLogArray->first();
675
log_file = log->openTempLog();
677
*log_rec_size = log->myTemplogRecSize;
679
*log_head_size = log->myTempLogHeadSize;
681
unlock_(myTempLogArray);
685
uint32_t MSDatabase::getTempLogCount()
690
lock_(myTempLogArray);
691
count = myTempLogArray->size();
692
unlock_(myTempLogArray);
696
void MSDatabase::removeTempLog(uint32_t log_id)
699
lock_(myTempLogArray);
700
myTempLogArray->remove(log_id);
701
unlock_(myTempLogArray);
705
CSObject *MSDatabase::getKey()
707
return (CSObject *) myDatabaseName;
710
int MSDatabase::compareKey(CSObject *key)
712
return myDatabaseName->compare((CSString *) key);
715
MSCompactorThread *MSDatabase::getCompactorThread()
717
return myCompactorThread;
720
CSSyncVector *MSDatabase::getRepositoryList()
722
return myRepostoryList;
725
#ifdef HAVE_ALIAS_SUPPORT
726
uint32_t MSDatabase::registerBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
729
bool can_retry = true;
733
lock_(&iBlobAliaseLock);
736
hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
740
unlock_(&iBlobAliaseLock);
742
// It can be that a duplicater alias exists that was deleted
743
// but the transaction has not been written to the repository yet.
744
// Flush all committed transactions to the repository file.
745
MSTransactionManager::flush();
753
unlock_(&iBlobAliaseLock);
757
uint32_t MSDatabase::updateBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t old_alias_hash, const char *alias)
761
lock_(&iBlobAliaseLock);
763
new_hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
764
iBlobAliases->deleteAlias(repo_id, repo_offset, old_alias_hash);
766
unlock_(&iBlobAliaseLock);
770
void MSDatabase::deleteBlobAlias(MSDiskAliasPtr diskRec)
773
lock_(&iBlobAliaseLock);
774
iBlobAliases->deleteAlias(diskRec);
775
unlock_(&iBlobAliaseLock);
779
void MSDatabase::deleteBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
781
MSDiskAliasRec diskRec;
783
CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);
784
CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);
785
CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);
786
deleteBlobAlias(&diskRec);
789
void MSDatabase::moveBlobAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
792
lock_(&iBlobAliaseLock);
793
iBlobAliases->resetAlias(old_repo_id, old_repo_offset, alias_hash, new_repo_id, new_repo_offset);
794
unlock_(&iBlobAliaseLock);
799
bool MSDatabase::isValidHeaderField(const char *name)
801
bool is_valid = false;
806
if (strcasecmp(name, MS_ALIAS_TAG)) {
807
lock_(&iHTTPMetaDataHeaders);
808
header = CSString::newString(name);
811
is_valid = (iHTTPMetaDataHeaders.find(header) != NULL);
814
unlock_(&iHTTPMetaDataHeaders);
822
void MSDatabase::startUp(const char *default_http_headers)
826
new_(gDatabaseList, CSSyncSortedList);
827
new_(gDatabaseArray, CSSparseArray(5));
828
MSHTTPHeaderTable::setDefaultMetaDataHeaders(default_http_headers);
829
PBMSSystemTables::systemTablesStartUp();
830
PBMSParameters::setBackupDatabaseID(1);
834
void MSDatabase::stopThreads()
840
lock_(gDatabaseList);
842
if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
846
if (db->myTempLogThread) {
847
db->myTempLogThread->stop();
848
db->myTempLogThread->release();
849
db->myTempLogThread = NULL;
851
if (db->myCompactorThread) {
852
db->myRepostoryList->wakeup(); // The compator thread waits on this.
853
db->myCompactorThread->stop();
854
db->myCompactorThread->release();
855
db->myCompactorThread = NULL;
858
if (db->iBackupThread) {
859
db->iBackupThread->stop();
860
db->iBackupThread->release();
861
db->iBackupThread = NULL;
866
unlock_(gDatabaseList);
871
void MSDatabase::shutDown()
874
if (gDatabaseArray) {
875
gDatabaseArray->clear();
876
gDatabaseArray->release();
877
gDatabaseArray = NULL;
881
gDatabaseList->clear();
882
gDatabaseList->release();
883
gDatabaseList = NULL;
886
MSHTTPHeaderTable::releaseDefaultMetaDataHeaders();
887
PBMSSystemTables::systemTableShutDown();
890
void MSDatabase::setBackupDatabase()
893
// I need to give the backup database a unique fake database ID.
894
// This is so that it is not confused with the database being backed
895
// backed up when opening tables.
897
// Normally database IDs are generated by time(NULL) so small database IDs
898
// are safe to use as fake IDs.
900
lock_(gDatabaseList);
901
myDatabaseID = PBMSParameters::getBackupDatabaseID() +1;
902
PBMSParameters::setBackupDatabaseID(myDatabaseID);
903
gDatabaseArray->set(myDatabaseID, RETAIN(this));
906
// Notify the cloud storage, if any, that it is a backup.
907
// This is important because if the backup database is dropped
908
// we need to be sure that only the BLOBs belonging to the
909
// backup are removed from the cloud.
910
myBlobCloud->cl_setCloudIsBackup();
912
unlock_(gDatabaseList);
914
// Rename the database path so that it is obviouse that this is an incomplete backup database.
915
// When the backup is completed it will be renamed back.
916
CSPath *new_path = CSPath::newPath(myDatabasePath->concat("#"));
919
if (new_path->exists())
922
CSPath *db_path = CSPath::newPath(RETAIN(myDatabasePath));
925
db_path->rename(new_path->getNameCString());
926
myDatabasePath->release();
927
myDatabasePath = new_path->getString();
928
myDatabasePath->retain();
937
void MSDatabase::releaseBackupDatabase()
942
// The backup has completed succefully, rename the path to the correct name.
943
CSPath *db_path = CSPath::newPath(myDatabasePath->getCString());
946
myDatabasePath->setLength(myDatabasePath->length()-1);
947
db_path->rename(cs_last_name_of_path(myDatabasePath->getCString()));
950
// Remove the backup database object.
951
lock_(gDatabaseList);
952
gDatabaseArray->remove(myDatabaseID);
953
MSTableList::removeDatabaseTables(this); // Will also release the database object.
954
unlock_(gDatabaseList);
960
void MSDatabase::startBackup(MSBackupInfo *backup_info)
966
if (iBackupThread->isRunning()) {
967
CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, "A backup is still running.");
969
iBackupThread->release();
970
iBackupThread = NULL;
974
iBackupThread = MSBackup::newMSBackup(backup_info);
977
iBackupThread->startBackup(RETAIN(this));
981
iBackupThread->release();
982
iBackupThread = NULL;
990
bool MSDatabase::backupStatus(uint64_t *total, uint64_t *completed, bool *completed_ok)
997
*total = iBackupThread->getBackupSize();
998
*completed = iBackupThread->getBackupCompletedSize();
999
done = !iBackupThread->isRunning();
1000
*completed = (iBackupThread->getStatus() == 0);
1002
*completed_ok = done = true;
1003
*total = *completed = 0;
1009
uint32_t MSDatabase::backupID()
1011
return (iBackupThread)?iBackupThread->backupID(): 0;
1014
void MSDatabase::terminateBackup()
1016
if (iBackupThread) {
1017
iBackupThread->stop();
1018
iBackupThread->release();
1019
iBackupThread = NULL;
1023
MSDatabase *MSDatabase::getDatabase(CSString *db_name, bool create)
1030
lock_(gDatabaseList);
1031
if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
1032
db = MSDatabase::loadDatabase(RETAIN(db_name), create);
1039
unlock_(gDatabaseList);
1044
MSDatabase *MSDatabase::getDatabase(const char *db_name, bool create)
1046
return getDatabase(CSString::newString(db_name), create);
1049
MSDatabase *MSDatabase::getDatabase(uint32_t db_id, bool missing_ok)
1054
lock_(gDatabaseList);
1055
if ((db = (MSDatabase *) gDatabaseArray->get((uint32_t) db_id)))
1058
// Look for the database folder with the correct ID:
1059
CSPath *path = CSPath::newPath(PBMSDaemon::getPBMSDir());
1061
if (path->exists()) {
1063
dir = CSDirectory::newDirectory(RETAIN(path));
1067
while (dir->next() && !db) {
1068
if (!dir->isFile()) {
1069
const char *ptr, *dir_name = dir->name();
1070
ptr = dir_name + strlen(dir_name) -1;
1072
while (ptr > dir_name && *ptr != '-') ptr--;
1075
int len = ptr - dir_name;
1077
if ((strtoul(ptr, NULL, 10) == db_id) && len) {
1078
db = getDatabase(CSString::newString(dir_name, len), true);
1079
ASSERT(db->myDatabaseID == db_id);
1088
unlock_(gDatabaseList);
1090
if ((!db) && !missing_ok) {
1091
char buffer[CS_EXC_MESSAGE_SIZE];
1093
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database #");
1094
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) db_id);
1095
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
1100
void MSDatabase::wakeTempLogThreads()
1107
lock_(gDatabaseList);
1108
for (int i=0;;i++) {
1109
if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
1111
if (db->myTempLogThread)
1112
db->myTempLogThread->wakeup();
1114
unlock_(gDatabaseList);
1118
uint32_t MSDatabase::getDBID(CSPath *path, CSString *db_name)
1122
int len = db_name->length();
1129
// Search for the ID of the database
1130
dir = CSDirectory::newDirectory(RETAIN(path));
1133
while (dir->next() && !db_id)
1135
if (!dir->isFile()){
1136
ptr = dir->name() + strlen(dir->name()) -1;
1137
while (ptr > dir->name() && isdigit(*ptr)) ptr--;
1138
if ((*ptr == '-') && (len == (ptr - dir->name())) && !db_name->compare(dir->name(), len) ) {
1139
db_id = atol(ptr+1);
1148
while (1) { // search for a unique db_id
1149
dir = CSDirectory::newDirectory(RETAIN(path));
1152
while (db_id && dir->next()) {
1153
if (!dir->isFile()) {
1154
ptr = dir->name() + strlen(dir->name()) -1;
1155
while (ptr > dir->name() && isdigit(*ptr)) ptr--;
1156
if ((*ptr == '-') && (db_id == strtoul(ptr+1, NULL, 10))) {
1164
sleep(1); // Allow 1 second to pass.
1174
CSPath *MSDatabase::createDatabasePath(const char *location, CSString *db_name, uint32_t *db_id_ptr, bool *create, bool is_pbms)
1176
bool create_path = *create;
1177
CSPath *path = NULL;
1178
char name_buffer[MS_DATABASE_NAME_SIZE + 40];
1184
path = CSPath::newPath(location, "pbms");
1186
if (!path->exists()) {
1197
// If this is the pbms database then nothing more is to be done.
1201
if ((!db_id_ptr) || !*db_id_ptr) {
1202
db_id = getDBID(RETAIN(path), RETAIN(db_name));
1208
// Create the PBMS database name with ID
1209
cs_strcpy(MS_DATABASE_NAME_SIZE + 40, name_buffer, db_name->getCString());
1210
cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, "-");
1211
cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, (uint32_t) db_id);
1214
path = CSPath::newPath(path, name_buffer);
1216
if (!path->exists()) {
1236
MSDatabase *MSDatabase::newDatabase(const char *db_location, CSString *db_name, uint32_t db_id, bool create)
1238
MSDatabase *db = NULL;
1242
const char *file_name;
1246
uint32_t to_delete = 0;
1248
bool is_pbms = false;
1254
//is_pbms = (strcmp(db_name->getCString(), "pbms") == 0); To be done later.
1257
* Block the creation of the pbms database if there is no MySQL database.
1258
* The database name is case sensitive here if the file system names are
1259
* case sensitive. This is desirable.
1261
path = CSPath::newPath(ms_my_get_mysql_home_path(), RETAIN(db_name));
1263
if (create && !path->exists()) {
1264
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
1268
// Create the database path, if 'create' == false then it can return NULL
1269
path = createDatabasePath(db_location, RETAIN(db_name), &db_id, &create, is_pbms);
1276
// Create the database object and initialize it.
1277
if (!(db = new MSDatabase())) {
1278
CSException::throwOSError(CS_CONTEXT, ENOMEM);
1281
db->myIsPBMS = is_pbms;
1282
db_path = path->getString();
1287
db->iNextBlobRefId = (uint32_t) time(NULL);
1288
db->iNextBlobRefId <<= 32;
1289
db->iNextBlobRefId = COMMIT_MASK(db->iNextBlobRefId);
1290
db->iNextBlobRefId++;
1292
db->myDatabaseID = db_id;
1293
db->myDatabasePath = db_path;
1294
db->myDatabaseName = db_name;
1295
new_(db->myBlobCloud, CloudDB(db_id));
1302
new_(db->myTempLogArray, CSSyncSparseArray(20));
1303
new_(db->iTableList, CSSyncSortedList());
1304
new_(db->iTableArray, CSSparseArray(20));
1305
new_(db->myRepostoryList, CSSyncVector(20));
1308
#ifdef HAVE_ALIAS_SUPPORT
1309
//db->retain(); no retain here, MSAlias() takes a back ref.
1310
new_(db->iBlobAliases, MSAlias(db));
1312
/* "Load" the database: */
1314
/* Get the max table ID: */
1315
dir = CSDirectory::newDirectory(RETAIN(db_path));
1318
while (dir->next()) {
1319
file_name = dir->name();
1320
if (dir->isFile() && cs_is_extension(file_name, "bst"))
1321
db->addTableFromFile(dir, file_name, false);
1325
path = CSPath::newPath(RETAIN(db_path), "bs-repository");
1326
if (path->exists()) {
1327
dir = CSDirectory::newDirectory(path);
1330
while (dir->next()) {
1331
file_name = dir->name();
1332
if (dir->isFile() && cs_is_extension(file_name, "bs")) {
1333
if ((file_id = fileToTableId(file_name, "repo"))) {
1334
dir->info(NULL, &file_size, NULL);
1335
new_(repo, MSRepository(file_id, db, file_size));
1336
db->myRepostoryList->set(file_id - 1, repo);
1347
path = CSPath::newPath(RETAIN(db_path), "bs-logs");
1348
if (path->exists()) {
1349
dir = CSDirectory::newDirectory(path);
1352
while (dir->next()) {
1353
file_name = dir->name();
1354
if (dir->isFile()) {
1355
if (cs_is_extension(file_name, "bs")) {
1356
if ((file_id = fileToTableId(file_name, "temp"))) {
1357
dir->info(NULL, &file_size, NULL);
1358
new_(log, MSTempLog(file_id, db, file_size));
1359
db->myTempLogArray->set(file_id, log);
1362
else if (cs_is_extension(file_name, "bst")) {
1363
db->addTableFromFile(dir, file_name, true);
1376
/* Go through and prepare all the tables that are to
1382
while ((tab = (MSTable *) db->iTableList->itemAt(i))) {
1383
if (tab->isToDelete())
1384
tab->prepareToDelete();
1395
void MSDatabase::startThreads()
1402
#ifdef HAVE_ALIAS_SUPPORT
1403
// iBlobAliases->ma_open() must be called before starting any threads.
1404
iBlobAliases->ma_open();
1407
new_(myTempLogThread, MSTempLogThread(1 * 1000, this));
1408
myTempLogThread->start();
1410
#ifdef MS_COMPACTOR_POLLS
1411
new_(myCompactorThread, MSCompactorThread(MS_COMPACTOR_POLL_FREQ, this));
1413
new_(myCompactorThread, MSCompactorThread(MS_DEFAULT_COMPACTOR_WAIT * 1000, this));
1417
myCompactorThread->start();
1422
void MSDatabase::dropDatabase()
1429
if (iBackupThread) {
1430
iBackupThread->stop();
1431
iBackupThread->release();
1432
iBackupThread = NULL;
1435
if (myTempLogThread) {
1436
myTempLogThread->stop();
1437
myTempLogThread->release();
1438
myTempLogThread = NULL;
1441
if (myCompactorThread) {
1442
myRepostoryList->wakeup(); // The compator thread waits on this.
1443
myCompactorThread->stop();
1444
myCompactorThread->release();
1445
myCompactorThread = NULL;
1448
// Call cloud drop database even if the database is not currrently
1449
// using cloud storage just in case to was in the past. If the connection
1450
// to the cloud is not setup then nothing will be done.
1452
myBlobCloud->cl_dropDB();
1455
self->logException();
1461
void MSDatabase::removeDatabasePath(CSString *doomedDatabasePath )
1463
CSPath *path = NULL;
1464
CSDirectory *dir = NULL;
1465
const char *file_name;
1468
push_(doomedDatabasePath);
1470
// Delete repository files
1471
path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-repository");
1473
if (path->exists()) {
1474
dir = CSDirectory::newDirectory(RETAIN(path));
1477
while (dir->next()) {
1478
file_name = dir->name();
1479
if (dir->isFile() && cs_is_extension(file_name, "bs")) {
1484
if (path->isEmpty())
1489
// Delete temp log files.
1490
path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-logs");
1492
if (path->exists()) {
1493
dir = CSDirectory::newDirectory(RETAIN(path));
1496
while (dir->next()) {
1497
file_name = dir->name();
1498
if (dir->isFile() && (cs_is_extension(file_name, "bs") || cs_is_extension(file_name, "bst"))) {
1503
if (path->isEmpty())
1508
// Delete table reference files.
1509
dir = CSDirectory::newDirectory(RETAIN(doomedDatabasePath));
1512
while (dir->next()) {
1513
file_name = dir->name();
1514
if (dir->isFile() && cs_is_extension(file_name, "bst"))
1519
#ifdef HAVE_ALIAS_SUPPORT
1520
path = CSPath::newPath(RETAIN(doomedDatabasePath), ACTIVE_ALIAS_INDEX);
1526
PBMSSystemTables::removeSystemTables(RETAIN(doomedDatabasePath));
1528
path = CSPath::newPath(RETAIN(doomedDatabasePath));
1530
if (path->isEmpty() && !path->isLink()) {
1533
CSStringBuffer *new_name;
1534
// If the database folder is not empty we rename it to get it out of the way.
1535
// If it is not renamed it will be reused if a database with the same name is
1536
// created again wich will result in the database ID being reused which may
1537
// have some bad side effects.
1538
new_(new_name, CSStringBuffer());
1540
new_name->append(cs_last_name_of_path(doomedDatabasePath->getCString()));
1541
new_name->append("_DROPPED");
1542
path->rename(new_name->getCString());
1547
release_(doomedDatabasePath);
1549
path = CSPath::newPath(PBMSDaemon::getPBMSDir());
1551
if (path->isEmpty() && !path->isLink()) {
1559
/* Drop the PBMS database if it exists.
1560
* The root folder 'pbms' will be deleted also
1561
* if it is empty and not a symbolic link.
1562
* The database folder in 'pbms' is deleted if it is empty and
1563
* it is not a symbolic link.
1565
void MSDatabase::dropDatabase(MSDatabase *doomedDatabase, const char *db_name )
1567
CSString *doomedDatabasePath = NULL;
1571
if (doomedDatabase) {
1572
push_(doomedDatabase);
1574
// Remove any pending transactions for the dropped database.
1575
// This is important because if the database is restored it will have the
1576
// same database ID and the old transactions would be applied to it.
1577
MSTransactionManager::dropDatabase(doomedDatabase->myDatabaseID);
1579
doomedDatabasePath = doomedDatabase->myDatabasePath;
1580
doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
1582
MSTableList::removeDatabaseTables(RETAIN(doomedDatabase));
1583
MSSystemTableShare::removeDatabaseSystemTables(RETAIN(doomedDatabase));
1585
doomedDatabase->dropDatabase(); // Shutdown database threads.
1587
// To avoid a deadlock a lock is not taken on the database list
1588
// if shutdown is in progress. The only database that would be
1589
// dropped during a shutdown is an incomplete backup database.
1590
ASSERT(doomedDatabase->isBackup || !self->myMustQuit);
1591
if (!self->myMustQuit)
1592
lock_(gDatabaseList); // Be sure to shutdown the database before locking this or it can lead to deadlocks
1594
gDatabaseArray->remove(doomedDatabase->myDatabaseID);
1595
if (!doomedDatabase->isBackup)
1596
gDatabaseList->remove(doomedDatabase->getKey());
1597
if (!self->myMustQuit)
1598
unlock_(gDatabaseList);
1599
ASSERT(doomedDatabase->getRefCount() == 1);
1600
release_(doomedDatabase);
1604
bool create = false;
1607
path = createDatabasePath(ms_my_get_mysql_home_path(), CSString::newString(db_name), &db_id, &create);
1610
MSTransactionManager::dropDatabase(db_id);
1613
doomedDatabasePath = path->getString();
1614
doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
1619
if (doomedDatabasePath)
1620
removeDatabasePath(doomedDatabasePath);
1625
void MSDatabase::dropDatabase(const char *db_name )
1628
dropDatabase(getDatabase(db_name, false), db_name);
1632
// The table_path can be several things here:
1633
// 1: <absalute path>/<database>/<table>
1634
// 2: <absalute path>/<database>
1635
// 3: <database>/<table>
1636
bool MSDatabase::convertTablePathToIDs(const char *table_path, uint32_t *db_id, uint32_t *tab_id, bool create)
1638
const char *base = ms_my_get_mysql_home_path();
1639
CSString *table_url;
1640
CSString *db_path = NULL;
1641
CSString *db_name = NULL;
1642
CSString *tab_name = NULL;
1649
table_url = CSString::newString(table_path);
1650
if (table_url->startsWith(base)) {
1651
table_url = table_url->right(base);
1656
db_path = table_url->left("/", -1);
1658
tab_name = table_url->right("/", -1);
1661
release_(table_url);
1663
if (db_path->length() == 0) { // Only a database name was supplied.
1668
if (tab_name->length() == 0) {
1669
tab_name->release();
1674
db_name = db_path->right("/", -1);
1676
if (db_name->length() == 0) {
1685
db = MSDatabase::getDatabase(db_name, create); // This will release db_name
1687
*db_id = db->myDatabaseID;
1692
tab = db->getTable(tab_name, create);// This will release tab_name
1695
*tab_id = tab->myTableID;
1703
return_((*tab_id > 0) && (*db_id > 0));
1706
bool MSDatabase::convertTableAndDatabaseToIDs(const char *db_name, const char *tab_name, uint32_t *db_id, uint32_t *tab_id, bool create)
1714
db = MSDatabase::getDatabase(db_name, create);
1717
*db_id = db->myDatabaseID;
1720
tab = db->getTable(tab_name, create);
1722
*tab_id = tab->myTableID;
1730
return_((*tab_id > 0) && (*db_id > 0));
1733
MSDatabase *MSDatabase::loadDatabase(CSString *db_name, bool create)
1738
db = newDatabase(ms_my_get_mysql_home_path(), db_name, 0, create);
1742
gDatabaseList->add(RETAIN(db));
1744
gDatabaseArray->set(db->myDatabaseID, RETAIN(db));
1746
PBMSSystemTables::loadSystemTables(RETAIN(db));
1753
uint32_t MSDatabase::getDatabaseID(CSString *db_name, bool create)
1761
lock_(gDatabaseList);
1762
if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
1763
db = MSDatabase::loadDatabase(RETAIN(db_name), create);
1766
id = db->myDatabaseID;
1769
id = db->myDatabaseID;
1772
unlock_(gDatabaseList);
1778
uint32_t MSDatabase::getDatabaseID(const char *db_name, bool create)
1780
return getDatabaseID(CSString::newString(db_name), create);
1783
MSDatabase *MSDatabase::getBackupDatabase(CSString *db_location, CSString *db_name, uint32_t db_id, bool create)
1785
bool was_created = create;
1792
// If the db already exists and 'create' == true then the existing db
1794
// Create the database path, if 'create' == false then it can return NULL
1795
path = createDatabasePath(db_location->getCString(), RETAIN(db_name), &db_id, &was_created);
1797
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
1801
// If we wanted to create it but it already exists then throw an error.
1802
if ( create && !was_created) {
1804
snprintf(str, 120, "Duplicate database: %s", db_name->getCString());
1805
CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, str);
1810
// everything looks OK
1811
db = newDatabase(db_location->getCString(), db_name, db_id, create);
1812
db->setBackupDatabase();
1813
release_(db_location);