1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
32
#include <drizzled/common.h>
33
#include <drizzled/session.h>
34
#include <drizzled/table.h>
35
#include <drizzled/message/table.pb.h>
36
#include "drizzled/charset_info.h"
37
#include <drizzled/table_proto.h>
38
#include <drizzled/field.h>
41
#include "cslib/CSConfig.h"
49
#include "cslib/CSGlobal.h"
50
#include "cslib/CSLog.h"
51
#include "cslib/CSDirectory.h"
52
#include "cslib/CSStrUtil.h"
54
#include "database_ms.h"
55
#include "open_table_ms.h"
56
#include "backup_ms.h"
58
#include "temp_log_ms.h"
59
#include "network_ms.h"
62
#include "transaction_ms.h"
63
//#include "systab_variable_ms.h"
64
#include "systab_httpheader_ms.h"
65
#include "parameters_ms.h"
66
#include "pbmsdaemon_ms.h"
70
CSSyncSortedList *MSDatabase::gDatabaseList;
71
CSSparseArray *MSDatabase::gDatabaseArray;
73
* -------------------------------------------------------------------------
77
MSDatabase::MSDatabase():
83
myCompactorThread(NULL),
84
myTempLogThread(NULL),
85
myRepostoryList(NULL),
87
myBlobType(MS_STANDARD_STORAGE),
92
#ifdef HAVE_ALIAS_SUPPORT
106
MSDatabase::~MSDatabase()
110
iBackupThread->stop();
111
iBackupThread->release();
112
iBackupThread = NULL;
115
if (myTempLogThread) {
116
myTempLogThread->stop();
117
myTempLogThread->release();
118
myTempLogThread = NULL;
120
if (myCompactorThread) {
121
myRepostoryList->wakeup(); // The compator thread waits on this.
122
myCompactorThread->stop();
123
myCompactorThread->release();
124
myCompactorThread = NULL;
128
myDatabasePath->release();
131
myDatabaseName->release();
133
iWriteTempLog = NULL;
134
if (myTempLogArray) {
135
myTempLogArray->clear();
136
myTempLogArray->release();
140
iTableList->release();
143
iTableArray->clear();
144
iTableArray->release();
146
if (myRepostoryList) {
147
myRepostoryList->clear();
148
myRepostoryList->release();
150
#ifdef HAVE_ALIAS_SUPPORT
152
iBlobAliases->ma_close();
153
iBlobAliases->release();
157
myBlobCloud->release();
161
uint32_t MSDatabase::fileToTableId(const char *file_name, const char *name_part)
166
const char *num = file_name + strlen(file_name) - 1;
168
while (num >= file_name && *num != '-')
171
/* Check the name part of the file: */
172
int len = strlen(name_part);
174
if (len != num - file_name)
176
if (strncmp(file_name, name_part, len) != 0)
181
sscanf(num, "%"PRIu32"", &value);
186
const char *MSDatabase::fileToTableName(size_t size, char *tab_name, const char *file_name)
191
file_name = cs_last_name_of_path(file_name);
192
cptr = file_name + strlen(file_name) - 1;
193
while (cptr > file_name && *cptr != '.')
195
if (cptr > file_name && *cptr == '.') {
196
if (strncmp(cptr, ".bs", 2) == 0) {
198
while (cptr > file_name && isdigit(*cptr))
203
len = cptr - file_name;
207
memcpy(tab_name, file_name, len);
210
/* Return a pointer to what was removed! */
211
return file_name + len;
215
const char *MSDatabase::getDatabaseNameCString()
217
return myDatabaseName->getCString();
220
MSTable *MSDatabase::getTable(CSString *tab_name, bool create)
227
if (!(tab = (MSTable *) iTableList->find(tab_name))) {
230
/* Create a new table: */
231
tab = MSTable::newTable(iMaxTableID+1, RETAIN(tab_name), this, (off64_t) 0, false);
232
iTableList->add(tab);
233
iTableArray->set(iMaxTableID+1, RETAIN(tab));
244
MSTable *MSDatabase::getTable(const char *tab_name, bool create)
246
return getTable(CSString::newString(tab_name), create);
250
MSTable *MSDatabase::getTable(uint32_t tab_id, bool missing_ok)
256
if (!(tab = (MSTable *) iTableArray->get((uint32_t) tab_id))) {
261
char buffer[CS_EXC_MESSAGE_SIZE];
263
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown table #");
264
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) tab_id);
265
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, " in database ");
266
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, getDatabaseNameCString());
267
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_TABLE, buffer);
274
MSTable *MSDatabase::getNextTable(uint32_t *pos)
281
while (i < iTableList->getSize()) {
282
tab = (MSTable *) iTableList->itemAt(i++);
283
if (!tab->isToDelete())
294
void MSDatabase::addTable(uint32_t tab_id, const char *tab_name, off64_t file_size, bool to_delete)
298
if (tab_id > iMaxTableID)
299
iMaxTableID = tab_id;
300
tab = MSTable::newTable(tab_id, tab_name, this, file_size, to_delete);
301
iTableList->add(tab);
302
iTableArray->set(tab_id, RETAIN(tab));
305
void MSDatabase::addTableFromFile(CSDirectory *dir, const char *file_name, bool to_delete)
309
char tab_name[MS_TABLE_NAME_SIZE];
311
dir->info(NULL, &file_size, NULL);
312
file_id = fileToTableId(file_name);
313
fileToTableName(MS_TABLE_NAME_SIZE, tab_name, file_name);
314
addTable(file_id, tab_name, file_size, to_delete);
317
void MSDatabase::removeTable(MSTable *tab)
322
iTableList->remove(tab->myTableName);
323
iTableArray->remove(tab->myTableID);
329
void MSDatabase::dropTable(MSTable *tab)
334
iTableList->remove(tab->myTableName);
335
iTableArray->remove(tab->myTableID);
337
// Cute: you drop the table by adding it with the 'to_delete' flag set to 'true'
338
addTable(tab->myTableID, tab->myTableName->getCString(), tab->getTableFileSize(), true);
345
// This function is used when dropping tables from a database before
346
// dropping the database itself.
347
CSString *MSDatabase::getATableName()
351
CSString *name = NULL;
356
while ((tab = (MSTable *) iTableList->itemAt(i++)) && tab->isToDelete()) ;
358
name = tab->getTableName();
365
uint32_t MSDatabase::getTableCount()
367
uint32_t cnt = 0, i = 0;
373
while ((tab = (MSTable *) iTableList->itemAt(i++))) {
374
if (!tab->isToDelete())
383
void MSDatabase::renameTable(MSTable *tab, const char *to_name)
387
iTableList->remove(tab->myTableName);
388
iTableArray->remove(tab->myTableID);
390
addTable(tab->myTableID, to_name, tab->getTableFileSize(), false);
396
void MSDatabase::openWriteRepo(MSOpenTable *otab)
398
if (otab->myWriteRepo && otab->myWriteRepoFile)
402
if (!otab->myWriteRepo)
403
otab->myWriteRepo = lockRepo(0);
405
/* Now open the repo file for the open table: */
406
otab->myWriteRepo->openRepoFileForWriting(otab);
410
MSRepository *MSDatabase::getRepoFullOfTrash(time_t *ret_wait_time)
412
MSRepository *repo = NULL;
413
time_t wait_time = 0;
416
wait_time = *ret_wait_time;
418
lock_(myRepostoryList);
419
for (uint32_t i=0; i<myRepostoryList->size(); i++) {
421
if ((repo = (MSRepository *) myRepostoryList->get(i))) {
422
if (!repo->isRemovingFP && !repo->mustBeDeleted && !repo->isRepoLocked()) {
423
if (!repo->myRepoHeadSize) {
424
/* The file has not yet been opened, so the
425
* garbage count will not be known!
427
MSRepoFile *repo_file;
430
unlock_(myRepostoryList);
432
repo_file = repo->openRepoFile();
433
repo_file->release();
435
lock_(myRepostoryList);
438
if (repo->getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
439
/* Make sure there are not temp BLOBs in this repository that have
442
time_t now = time(NULL);
443
time_t then = repo->myLastTempTime;
445
/* Check if there are any temp BLOBs to be removed: */
446
if (now > (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
447
repo->lockRepo(REPO_COMPACTING);
452
/* There are temp BLOBs to wait for... */
453
if (!wait_time || wait_time > MSTempLog::adjustWaitTime(then, now))
454
wait_time = MSTempLog::adjustWaitTime(then, now);
461
unlock_(myRepostoryList);
463
*ret_wait_time = wait_time;
467
MSRepository *MSDatabase::lockRepo(off64_t size)
473
lock_(myRepostoryList);
474
free_slot = myRepostoryList->size();
475
/* Find an unlocked repository file that is below the write threshold: */
476
for (uint32_t i=0; i<myRepostoryList->size(); i++) {
477
if ((repo = (MSRepository *) myRepostoryList->get(i))) {
478
if ((!repo->isRepoLocked()) && (!repo->isRemovingFP) && (!repo->mustBeDeleted) &&
479
((repo->myRepoFileSize + size) < PBMSParameters::getRepoThreshold())
480
/**/ && (repo->getGarbageLevel() < PBMSParameters::getGarbageThreshold()))
489
/* None found, create a new repo file: */
490
new_(repo, MSRepository(free_slot + 1, this, 0));
491
myRepostoryList->set(free_slot, repo);
495
repo->lockRepo(REPO_WRITE); // <- The MSRepository::backToPool() will unlock this.
496
unlock_(myRepostoryList);
500
MSRepoFile *MSDatabase::getRepoFileFromPool(uint32_t repo_id, bool missing_ok)
506
lock_(myRepostoryList);
507
if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1))) {
509
char buffer[CS_EXC_MESSAGE_SIZE];
511
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown repository file: ");
512
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
513
CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, buffer);
515
unlock_(myRepostoryList);
518
if (repo->isRemovingFP) {
519
char buffer[CS_EXC_MESSAGE_SIZE];
521
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Repository will be removed: ");
522
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
523
CSException::throwException(CS_CONTEXT, MS_ERR_REMOVING_REPO, buffer);
525
repo->retain(); /* Release is here: [++] */
526
file = repo->getRepoFile();
527
unlock_(myRepostoryList);
530
file = repo->openRepoFile();
531
lock_(myRepostoryList);
532
repo->addRepoFile(file);
534
unlock_(myRepostoryList);
539
void MSDatabase::returnRepoFileToPool(MSRepoFile *file)
544
lock_(myRepostoryList);
546
if ((repo = file->myRepo)) {
547
if (repo->isRemovingFP) {
548
repo->removeRepoFile(file);
549
myRepostoryList->wakeup();
552
repo->returnRepoFile(file);
553
repo->release(); /* [++] here is the release. */
556
unlock_(myRepostoryList);
560
void MSDatabase::removeRepo(uint32_t repo_id, bool *mustQuit)
565
lock_(myRepostoryList);
566
while ((!mustQuit || !*mustQuit) && !iClosing) {
567
if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1)))
569
repo->isRemovingFP = true;
570
if (repo->removeRepoFilesNotInUse()) {
571
myRepostoryList->set(repo_id - 1, NULL);
575
* Wait for the files that are in use to be
578
myRepostoryList->wait();
580
unlock_(myRepostoryList);
584
void MSDatabase::queueTempLogEvent(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
585
uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time)
587
MSTempLogItemRec item;
590
// Each otab object holds an handle to an instance of an OPEN
591
// temp log. This is so that each thread has it's own open temp log
592
// and doesn't need to be opened and close it constantly.
595
lock_(myTempLogArray);
596
if (!iWriteTempLog) {
597
iWriteTempLog = (MSTempLog *) myTempLogArray->last();
598
if (!iWriteTempLog) {
599
new_(iWriteTempLog, MSTempLog(1, this, 0));
600
myTempLogArray->set(1, iWriteTempLog);
603
if (!otab->myTempLogFile)
604
otab->myTempLogFile = iWriteTempLog->openTempLog();
605
else if (otab->myTempLogFile->myTempLogID != iWriteTempLog->myLogID) {
606
otab->myTempLogFile->release();
607
otab->myTempLogFile = NULL;
608
otab->myTempLogFile = iWriteTempLog->openTempLog();
611
if (iWriteTempLog->myTempLogSize >= PBMSParameters::getTempLogThreshold()) {
612
uint32_t tmp_log_id = iWriteTempLog->myLogID + 1;
614
new_(iWriteTempLog, MSTempLog(tmp_log_id, this, 0));
615
myTempLogArray->set(tmp_log_id, iWriteTempLog);
617
otab->myTempLogFile->release();
618
otab->myTempLogFile = NULL;
619
otab->myTempLogFile = iWriteTempLog->openTempLog();
624
*log_id = iWriteTempLog->myLogID;
625
*log_offset = (uint32_t) iWriteTempLog->myTempLogSize;
628
iWriteTempLog->myTempLogSize += iWriteTempLog->myTemplogRecSize;
629
unlock_(myTempLogArray);
631
CS_SET_DISK_1(item.ti_type_1, type);
632
CS_SET_DISK_4(item.ti_table_id_4, tab_id);
633
CS_SET_DISK_6(item.ti_blob_id_6, blob_id);
634
CS_SET_DISK_4(item.ti_auth_code_4, auth_code);
635
CS_SET_DISK_4(item.ti_time_4, timev);
636
otab->myTempLogFile->write(&item, *log_offset, sizeof(MSTempLogItemRec));
642
#ifdef HAVE_ALIAS_SUPPORT
643
void MSDatabase::queueForDeletion(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
644
uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time, MSDiskAliasPtr aliasDiskRec)
648
queueTempLogEvent(otab, type, tab_id, blob_id, auth_code, log_id, log_offset, q_time);
650
// If it has an alias remove it from the ailias index.
653
deleteBlobAlias(aliasDiskRec);
656
self->logException();
664
MSTempLogFile *MSDatabase::openTempLogFile(uint32_t log_id, size_t *log_rec_size, size_t *log_head_size)
667
MSTempLogFile *log_file = NULL;
670
lock_(myTempLogArray);
672
log = (MSTempLog *) myTempLogArray->get(log_id);
674
log = (MSTempLog *) myTempLogArray->first();
676
log_file = log->openTempLog();
678
*log_rec_size = log->myTemplogRecSize;
680
*log_head_size = log->myTempLogHeadSize;
682
unlock_(myTempLogArray);
686
uint32_t MSDatabase::getTempLogCount()
691
lock_(myTempLogArray);
692
count = myTempLogArray->size();
693
unlock_(myTempLogArray);
697
void MSDatabase::removeTempLog(uint32_t log_id)
700
lock_(myTempLogArray);
701
myTempLogArray->remove(log_id);
702
unlock_(myTempLogArray);
706
CSObject *MSDatabase::getKey()
708
return (CSObject *) myDatabaseName;
711
int MSDatabase::compareKey(CSObject *key)
713
return myDatabaseName->compare((CSString *) key);
716
MSCompactorThread *MSDatabase::getCompactorThread()
718
return myCompactorThread;
721
CSSyncVector *MSDatabase::getRepositoryList()
723
return myRepostoryList;
726
#ifdef HAVE_ALIAS_SUPPORT
727
uint32_t MSDatabase::registerBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
730
bool can_retry = true;
734
lock_(&iBlobAliaseLock);
737
hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
741
unlock_(&iBlobAliaseLock);
743
// It can be that a duplicater alias exists that was deleted
744
// but the transaction has not been written to the repository yet.
745
// Flush all committed transactions to the repository file.
746
MSTransactionManager::flush();
754
unlock_(&iBlobAliaseLock);
758
uint32_t MSDatabase::updateBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t old_alias_hash, const char *alias)
762
lock_(&iBlobAliaseLock);
764
new_hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
765
iBlobAliases->deleteAlias(repo_id, repo_offset, old_alias_hash);
767
unlock_(&iBlobAliaseLock);
771
void MSDatabase::deleteBlobAlias(MSDiskAliasPtr diskRec)
774
lock_(&iBlobAliaseLock);
775
iBlobAliases->deleteAlias(diskRec);
776
unlock_(&iBlobAliaseLock);
780
void MSDatabase::deleteBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
782
MSDiskAliasRec diskRec;
784
CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);
785
CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);
786
CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);
787
deleteBlobAlias(&diskRec);
790
void MSDatabase::moveBlobAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
793
lock_(&iBlobAliaseLock);
794
iBlobAliases->resetAlias(old_repo_id, old_repo_offset, alias_hash, new_repo_id, new_repo_offset);
795
unlock_(&iBlobAliaseLock);
800
bool MSDatabase::isValidHeaderField(const char *name)
802
bool is_valid = false;
807
if (strcasecmp(name, MS_ALIAS_TAG)) {
808
lock_(&iHTTPMetaDataHeaders);
809
header = CSString::newString(name);
812
is_valid = (iHTTPMetaDataHeaders.find(header) != NULL);
815
unlock_(&iHTTPMetaDataHeaders);
823
void MSDatabase::startUp(const char *default_http_headers)
827
new_(gDatabaseList, CSSyncSortedList);
828
new_(gDatabaseArray, CSSparseArray(5));
829
MSHTTPHeaderTable::setDefaultMetaDataHeaders(default_http_headers);
830
PBMSSystemTables::systemTablesStartUp();
831
PBMSParameters::setBackupDatabaseID(1);
835
void MSDatabase::stopThreads()
841
lock_(gDatabaseList);
843
if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
847
if (db->myTempLogThread) {
848
db->myTempLogThread->stop();
849
db->myTempLogThread->release();
850
db->myTempLogThread = NULL;
852
if (db->myCompactorThread) {
853
db->myRepostoryList->wakeup(); // The compator thread waits on this.
854
db->myCompactorThread->stop();
855
db->myCompactorThread->release();
856
db->myCompactorThread = NULL;
859
if (db->iBackupThread) {
860
db->iBackupThread->stop();
861
db->iBackupThread->release();
862
db->iBackupThread = NULL;
867
unlock_(gDatabaseList);
872
void MSDatabase::shutDown()
875
if (gDatabaseArray) {
876
gDatabaseArray->clear();
877
gDatabaseArray->release();
878
gDatabaseArray = NULL;
882
gDatabaseList->clear();
883
gDatabaseList->release();
884
gDatabaseList = NULL;
887
MSHTTPHeaderTable::releaseDefaultMetaDataHeaders();
888
PBMSSystemTables::systemTableShutDown();
891
void MSDatabase::setBackupDatabase()
894
// I need to give the backup database a unique fake database ID.
895
// This is so that it is not confused with the database being backed
896
// backed up when opening tables.
898
// Normally database IDs are generated by time(NULL) so small database IDs
899
// are safe to use as fake IDs.
901
lock_(gDatabaseList);
902
myDatabaseID = PBMSParameters::getBackupDatabaseID() +1;
903
PBMSParameters::setBackupDatabaseID(myDatabaseID);
904
gDatabaseArray->set(myDatabaseID, RETAIN(this));
907
// Notify the cloud storage, if any, that it is a backup.
908
// This is important because if the backup database is dropped
909
// we need to be sure that only the BLOBs belonging to the
910
// backup are removed from the cloud.
911
myBlobCloud->cl_setCloudIsBackup();
913
unlock_(gDatabaseList);
915
// Rename the database path so that it is obviouse that this is an incomplete backup database.
916
// When the backup is completed it will be renamed back.
917
CSPath *new_path = CSPath::newPath(myDatabasePath->concat("#"));
920
if (new_path->exists())
923
CSPath *db_path = CSPath::newPath(RETAIN(myDatabasePath));
926
db_path->rename(new_path->getNameCString());
927
myDatabasePath->release();
928
myDatabasePath = new_path->getString();
929
myDatabasePath->retain();
938
void MSDatabase::releaseBackupDatabase()
943
// The backup has completed succefully, rename the path to the correct name.
944
CSPath *db_path = CSPath::newPath(myDatabasePath->getCString());
947
myDatabasePath->setLength(myDatabasePath->length()-1);
948
db_path->rename(cs_last_name_of_path(myDatabasePath->getCString()));
951
// Remove the backup database object.
952
lock_(gDatabaseList);
953
gDatabaseArray->remove(myDatabaseID);
954
MSTableList::removeDatabaseTables(this); // Will also release the database object.
955
unlock_(gDatabaseList);
961
void MSDatabase::startBackup(MSBackupInfo *backup_info)
967
if (iBackupThread->isRunning()) {
968
CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, "A backup is still running.");
970
iBackupThread->release();
971
iBackupThread = NULL;
975
iBackupThread = MSBackup::newMSBackup(backup_info);
977
push_(iBackupThread);
979
iBackupThread->startBackup(RETAIN(this));
983
iBackupThread->release();
984
iBackupThread = NULL;
992
bool MSDatabase::backupStatus(uint64_t *total, uint64_t *completed, bool *completed_ok)
999
*total = iBackupThread->getBackupSize();
1000
*completed = iBackupThread->getBackupCompletedSize();
1001
done = !iBackupThread->isRunning();
1002
*completed = (iBackupThread->getStatus() == 0);
1004
*completed_ok = done = true;
1005
*total = *completed = 0;
1011
uint32_t MSDatabase::backupID()
1013
return (iBackupThread)?iBackupThread->backupID(): 0;
1016
void MSDatabase::terminateBackup()
1018
if (iBackupThread) {
1019
iBackupThread->stop();
1020
iBackupThread->release();
1021
iBackupThread = NULL;
1025
MSDatabase *MSDatabase::getDatabase(CSString *db_name, bool create)
1032
lock_(gDatabaseList);
1033
if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
1034
db = MSDatabase::loadDatabase(RETAIN(db_name), create);
1041
unlock_(gDatabaseList);
1046
MSDatabase *MSDatabase::getDatabase(const char *db_name, bool create)
1048
return getDatabase(CSString::newString(db_name), create);
1051
MSDatabase *MSDatabase::getDatabase(uint32_t db_id)
1056
lock_(gDatabaseList);
1057
if ((db = (MSDatabase *) gDatabaseArray->get((uint32_t) db_id)))
1060
// Look for the database folder with the correct ID:
1061
CSPath *path = CSPath::newPath(PBMSDaemon::getPBMSDir());
1063
if (path->exists()) {
1065
dir = CSDirectory::newDirectory(RETAIN(path));
1069
while (dir->next() && !db) {
1070
if (!dir->isFile()) {
1071
const char *ptr, *dir_name = dir->name();
1072
ptr = dir_name + strlen(dir_name) -1;
1074
while (ptr > dir_name && *ptr != '-') ptr--;
1077
int len = ptr - dir_name;
1079
if ((strtoul(ptr, NULL, 10) == db_id) && len) {
1080
db = getDatabase(CSCString::newString(dir_name, len), true);
1081
ASSERT(db->myDatabaseID == db_id);
1090
unlock_(gDatabaseList);
1093
char buffer[CS_EXC_MESSAGE_SIZE];
1095
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database #");
1096
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) db_id);
1097
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
1102
void MSDatabase::wakeTempLogThreads()
1109
lock_(gDatabaseList);
1110
for (int i=0;;i++) {
1111
if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
1113
if (db->myTempLogThread)
1114
db->myTempLogThread->wakeup();
1116
unlock_(gDatabaseList);
1120
uint32_t MSDatabase::getDBID(CSPath *path, CSString *db_name)
1124
int len = db_name->length();
1131
// Search for the ID of the database
1132
dir = CSDirectory::newDirectory(RETAIN(path));
1135
while (dir->next() && !db_id)
1137
if (!dir->isFile()){
1138
ptr = dir->name() + strlen(dir->name()) -1;
1139
while (ptr > dir->name() && isdigit(*ptr)) ptr--;
1140
if ((*ptr == '-') && (len == (ptr - dir->name())) && !db_name->compare(dir->name(), len) ) {
1141
db_id = atol(ptr+1);
1150
while (1) { // search for a unique db_id
1151
dir = CSDirectory::newDirectory(RETAIN(path));
1154
while (db_id && dir->next()) {
1155
if (!dir->isFile()) {
1156
ptr = dir->name() + strlen(dir->name()) -1;
1157
while (ptr > dir->name() && isdigit(*ptr)) ptr--;
1158
if ((*ptr == '-') && (db_id == strtoul(ptr+1, NULL, 10))) {
1166
sleep(1); // Allow 1 second to pass.
1176
CSPath *MSDatabase::createDatabasePath(const char *location, CSString *db_name, uint32_t *db_id_ptr, bool *create, bool is_pbms)
1178
bool create_path = *create;
1179
CSPath *path = NULL;
1180
char name_buffer[MS_DATABASE_NAME_SIZE + 40];
1186
path = CSPath::newPath(location, "pbms");
1188
if (!path->exists()) {
1199
// If this is the pbms database then nothing more is to be done.
1203
if ((!db_id_ptr) || !*db_id_ptr) {
1204
db_id = getDBID(RETAIN(path), RETAIN(db_name));
1210
// Create the PBMS database name with ID
1211
cs_strcpy(MS_DATABASE_NAME_SIZE + 40, name_buffer, db_name->getCString());
1212
cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, "-");
1213
cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, (uint32_t) db_id);
1216
path = CSPath::newPath(path, name_buffer);
1218
if (!path->exists()) {
1238
MSDatabase *MSDatabase::newDatabase(const char *db_location, CSString *db_name, uint32_t db_id, bool create)
1240
MSDatabase *db = NULL;
1244
const char *file_name;
1248
uint32_t to_delete = 0;
1250
bool is_pbms = false;
1256
//is_pbms = (strcmp(db_name->getCString(), "pbms") == 0); To be done later.
1258
/* Block the creation of the pbms database if there is no MySQL database. */
1259
path = CSPath::newPath(ms_my_get_mysql_home_path(), RETAIN(db_name));
1261
if (create && !path->exists()) {
1262
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
1266
// Create the database path, if 'create' == false then it can return NULL
1267
path = createDatabasePath(db_location, RETAIN(db_name), &db_id, &create, is_pbms);
1274
// Create the database object and initialize it.
1275
if (!(db = new MSDatabase())) {
1276
CSException::throwOSError(CS_CONTEXT, ENOMEM);
1279
db->myIsPBMS = is_pbms;
1280
db_path = path->getString();
1285
db->iNextBlobRefId = (uint32_t) time(NULL);
1286
db->iNextBlobRefId <<= 32;
1287
db->iNextBlobRefId = COMMIT_MASK(db->iNextBlobRefId);
1288
db->iNextBlobRefId++;
1290
db->myDatabaseID = db_id;
1291
db->myDatabasePath = db_path;
1292
db->myDatabaseName = db_name;
1293
new_(db->myBlobCloud, CloudDB(db_id));
1300
new_(db->myTempLogArray, CSSyncSparseArray(20));
1301
new_(db->iTableList, CSSyncSortedList());
1302
new_(db->iTableArray, CSSparseArray(20));
1303
new_(db->myRepostoryList, CSSyncVector(20));
1306
#ifdef HAVE_ALIAS_SUPPORT
1307
//db->retain(); no retain here, MSAlias() takes a back ref.
1308
new_(db->iBlobAliases, MSAlias(db));
1310
/* "Load" the database: */
1312
/* Get the max table ID: */
1313
dir = CSDirectory::newDirectory(RETAIN(db_path));
1316
while (dir->next()) {
1317
file_name = dir->name();
1318
if (dir->isFile() && cs_is_extension(file_name, "bst"))
1319
db->addTableFromFile(dir, file_name, false);
1323
path = CSPath::newPath(RETAIN(db_path), "bs-repository");
1324
if (path->exists()) {
1325
dir = CSDirectory::newDirectory(path);
1328
while (dir->next()) {
1329
file_name = dir->name();
1330
if (dir->isFile() && cs_is_extension(file_name, "bs")) {
1331
if ((file_id = fileToTableId(file_name, "repo"))) {
1332
dir->info(NULL, &file_size, NULL);
1333
new_(repo, MSRepository(file_id, db, file_size));
1334
db->myRepostoryList->set(file_id - 1, repo);
1345
path = CSPath::newPath(RETAIN(db_path), "bs-logs");
1346
if (path->exists()) {
1347
dir = CSDirectory::newDirectory(path);
1350
while (dir->next()) {
1351
file_name = dir->name();
1352
if (dir->isFile()) {
1353
if (cs_is_extension(file_name, "bs")) {
1354
if ((file_id = fileToTableId(file_name, "temp"))) {
1355
dir->info(NULL, &file_size, NULL);
1356
new_(log, MSTempLog(file_id, db, file_size));
1357
db->myTempLogArray->set(file_id, log);
1360
else if (cs_is_extension(file_name, "bst")) {
1361
db->addTableFromFile(dir, file_name, true);
1374
/* Go through and prepare all the tables that are to
1380
while ((tab = (MSTable *) db->iTableList->itemAt(i))) {
1381
if (tab->isToDelete())
1382
tab->prepareToDelete();
1393
void MSDatabase::startThreads()
1400
#ifdef HAVE_ALIAS_SUPPORT
1401
// iBlobAliases->ma_open() must be called before starting any threads.
1402
iBlobAliases->ma_open();
1405
new_(myTempLogThread, MSTempLogThread(1 * 1000, this));
1406
myTempLogThread->start();
1408
#ifdef MS_COMPACTOR_POLLS
1409
new_(myCompactorThread, MSCompactorThread(MS_COMPACTOR_POLL_FREQ, this));
1411
new_(myCompactorThread, MSCompactorThread(MS_DEFAULT_COMPACTOR_WAIT * 1000, this));
1415
myCompactorThread->start();
1420
void MSDatabase::dropDatabase()
1427
if (iBackupThread) {
1428
iBackupThread->stop();
1429
iBackupThread->release();
1430
iBackupThread = NULL;
1433
if (myTempLogThread) {
1434
myTempLogThread->stop();
1435
myTempLogThread->release();
1436
myTempLogThread = NULL;
1439
if (myCompactorThread) {
1440
myRepostoryList->wakeup(); // The compator thread waits on this.
1441
myCompactorThread->stop();
1442
myCompactorThread->release();
1443
myCompactorThread = NULL;
1446
// Call cloud drop database even if the database is not currrently
1447
// using cloud storage just in case to was in the past. If the connection
1448
// to the cloud is not setup then nothing will be done.
1450
myBlobCloud->cl_dropDB();
1453
self->logException();
1459
void MSDatabase::removeDatabasePath(CSString *doomedDatabasePath )
1461
CSPath *path = NULL;
1462
CSDirectory *dir = NULL;
1463
const char *file_name;
1466
push_(doomedDatabasePath);
1468
// Delete repository files
1469
path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-repository");
1471
if (path->exists()) {
1472
dir = CSDirectory::newDirectory(RETAIN(path));
1475
while (dir->next()) {
1476
file_name = dir->name();
1477
if (dir->isFile() && cs_is_extension(file_name, "bs")) {
1482
if (path->isEmpty())
1487
// Delete temp log files.
1488
path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-logs");
1490
if (path->exists()) {
1491
dir = CSDirectory::newDirectory(RETAIN(path));
1494
while (dir->next()) {
1495
file_name = dir->name();
1496
if (dir->isFile() && (cs_is_extension(file_name, "bs") || cs_is_extension(file_name, "bst"))) {
1501
if (path->isEmpty())
1506
// Delete table reference files.
1507
dir = CSDirectory::newDirectory(RETAIN(doomedDatabasePath));
1510
while (dir->next()) {
1511
file_name = dir->name();
1512
if (dir->isFile() && cs_is_extension(file_name, "bst"))
1517
#ifdef HAVE_ALIAS_SUPPORT
1518
path = CSPath::newPath(RETAIN(doomedDatabasePath), ACTIVE_ALIAS_INDEX);
1524
PBMSSystemTables::removeSystemTables(RETAIN(doomedDatabasePath));
1526
path = CSPath::newPath(RETAIN(doomedDatabasePath));
1528
if (path->isEmpty() && !path->isLink()) {
1531
CSStringBuffer *new_name;
1532
// If the database folder is not empty we rename it to get it out of the way.
1533
// If it is not renamed it will be reused if a database with the same name is
1534
// created again wich will result in the database ID being reused which may
1535
// have some bad side effects.
1536
new_(new_name, CSStringBuffer());
1538
new_name->append(cs_last_name_of_path(doomedDatabasePath->getCString()));
1539
new_name->append("_DROPPED");
1540
path->rename(new_name->getCString());
1545
release_(doomedDatabasePath);
1547
path = CSPath::newPath(PBMSDaemon::getPBMSDir());
1549
if (path->isEmpty() && !path->isLink()) {
1557
/* Drop the PBMS database if it exists.
1558
* The root folder 'pbms' will be deleted also
1559
* if it is empty and not a symbolic link.
1560
* The database folder in 'pbms' is deleted if it is empty and
1561
* it is not a symbolic link.
1563
void MSDatabase::dropDatabase(MSDatabase *doomedDatabase, const char *db_name )
1565
CSString *doomedDatabasePath = NULL;
1569
if (doomedDatabase) {
1570
push_(doomedDatabase);
1572
// Remove any pending transactions for the dropped database.
1573
// This is important because if the database is restored it will have the
1574
// same database ID and the old transactions would be applied to it.
1575
MSTransactionManager::dropDatabase(doomedDatabase->myDatabaseID);
1577
doomedDatabasePath = doomedDatabase->myDatabasePath;
1578
doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
1580
MSTableList::removeDatabaseTables(RETAIN(doomedDatabase));
1581
MSSystemTableShare::removeDatabaseSystemTables(RETAIN(doomedDatabase));
1583
doomedDatabase->dropDatabase(); // Shutdown database threads.
1585
// To avoid a deadlock a lock is not taken on the database list
1586
// if shutdown is in progress. The only database that would be
1587
// dropped during a shutdown is an incomplete backup database.
1588
ASSERT(doomedDatabase->isBackup || !self->myMustQuit);
1589
if (!self->myMustQuit)
1590
lock_(gDatabaseList); // Be sure to shutdown the database before locking this or it can lead to deadlocks
1592
gDatabaseArray->remove(doomedDatabase->myDatabaseID);
1593
if (!doomedDatabase->isBackup)
1594
gDatabaseList->remove(doomedDatabase->getKey());
1595
if (!self->myMustQuit)
1596
unlock_(gDatabaseList);
1597
ASSERT(doomedDatabase->iRefCount == 1);
1598
release_(doomedDatabase);
1602
bool create = false;
1605
path = createDatabasePath(ms_my_get_mysql_home_path(), CSString::newString(db_name), &db_id, &create);
1608
MSTransactionManager::dropDatabase(db_id);
1611
doomedDatabasePath = path->getString();
1612
doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
1617
if (doomedDatabasePath)
1618
removeDatabasePath(doomedDatabasePath);
1623
void MSDatabase::dropDatabase(const char *db_name )
1626
dropDatabase(getDatabase(db_name, false), db_name);
1630
// The table_path can be several things here:
1631
// 1: <absalute path>/<database>/<table>
1632
// 2: <absalute path>/<database>
1633
// 3: <database>/<table>
1634
bool MSDatabase::convertTablePathToIDs(const char *table_path, uint32_t *db_id, uint32_t *tab_id, bool create)
1636
const char *base = ms_my_get_mysql_home_path();
1637
CSString *table_url;
1638
CSString *db_path = NULL;
1639
CSString *db_name = NULL;
1640
CSString *tab_name = NULL;
1647
table_url = CSString::newString(table_path);
1648
if (table_url->startsWith(base)) {
1649
table_url = table_url->right(base);
1654
db_path = table_url->left("/", -1);
1656
tab_name = table_url->right("/", -1);
1659
release_(table_url);
1661
if (db_path->length() == 0) { // Only a database name was supplied.
1666
if (tab_name->length() == 0) {
1667
tab_name->release();
1672
db_name = db_path->right("/", -1);
1674
if (db_name->length() == 0) {
1683
db = MSDatabase::getDatabase(db_name, create); // This will release db_name
1685
*db_id = db->myDatabaseID;
1690
tab = db->getTable(tab_name, create);// This will release tab_name
1693
*tab_id = tab->myTableID;
1701
return_((*tab_id > 0) && (*db_id > 0));
1704
bool MSDatabase::convertTableAndDatabaseToIDs(const char *db_name, const char *tab_name, uint32_t *db_id, uint32_t *tab_id, bool create)
1712
db = MSDatabase::getDatabase(db_name, create);
1715
*db_id = db->myDatabaseID;
1718
tab = db->getTable(tab_name, create);
1720
*tab_id = tab->myTableID;
1728
return_((*tab_id > 0) && (*db_id > 0));
1731
MSDatabase *MSDatabase::loadDatabase(CSString *db_name, bool create)
1736
db = newDatabase(ms_my_get_mysql_home_path(), db_name, 0, create);
1741
gDatabaseList->add(RETAIN(db));
1743
gDatabaseArray->set(db->myDatabaseID, RETAIN(db));
1745
PBMSSystemTables::loadSystemTables(RETAIN(db));
1752
uint32_t MSDatabase::getDatabaseID(CSString *db_name, bool create)
1760
lock_(gDatabaseList);
1761
if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
1762
db = MSDatabase::loadDatabase(RETAIN(db_name), create);
1765
id = db->myDatabaseID;
1768
id = db->myDatabaseID;
1771
unlock_(gDatabaseList);
1777
uint32_t MSDatabase::getDatabaseID(const char *db_name, bool create)
1779
return getDatabaseID(CSString::newString(db_name), create);
1782
MSDatabase *MSDatabase::getBackupDatabase(CSString *db_location, CSString *db_name, uint32_t db_id, bool create)
1784
bool was_created = create;
1791
// If the db already exists and 'create' == true then the existing db
1793
// Create the database path, if 'create' == false then it can return NULL
1794
path = createDatabasePath(db_location->getCString(), RETAIN(db_name), &db_id, &was_created);
1796
CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
1800
// If we wanted to create it but it already exists then throw an error.
1801
if ( create && !was_created) {
1803
snprintf(str, 120, "Duplicate database: %s", db_name->getCString());
1804
CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, str);
1809
// everything looks OK
1810
db = newDatabase(db_location->getCString(), db_name, db_id, create);
1811
db->setBackupDatabase();
1812
release_(db_location);