~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/Database_ms.cc

Added the PBMS daemon plugin.

(Augen zu und durch!)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Original author: Paul McCullagh
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-05-25
 
23
 *
 
24
 * H&G2JCtL
 
25
 *
 
26
 * Network interface.
 
27
 *
 
28
 */
 
29
 
 
30
#include "CSConfig.h"
 
31
 
 
32
#include <string.h>
 
33
#include <ctype.h>
 
34
 
 
35
#include "string.h"
 
36
 
 
37
#include "CSGlobal.h"
 
38
#include "CSLog.h"
 
39
#include "CSDirectory.h"
 
40
#include "CSStrUtil.h"
 
41
 
 
42
#include "Database_ms.h"
 
43
#include "OpenTable_ms.h"
 
44
#include "backup_ms.h"
 
45
#include "Table_ms.h"
 
46
#include "Util_ms.h"
 
47
#include "TempLog_ms.h"
 
48
#include "Network_ms.h"
 
49
#include "ms_mysql.h"
 
50
#include "pbmslib.h"
 
51
#include "Transaction_ms.h"
 
52
//#include "SysTab_variable.h"
 
53
#include "SysTab_httpheader.h"
 
54
 
 
55
 
 
56
 
 
57
uint64_t                                MSDatabase::gRepoThreshold;
 
58
uint64_t                                MSDatabase::gTempLogThreshold;
 
59
CSSyncSortedList        *MSDatabase::gDatabaseList;
 
60
CSSparseArray           *MSDatabase::gDatabaseArray;
 
61
uint64_t                                MSDatabase::gBackupDatabaseID;
 
62
/*
 
63
 * -------------------------------------------------------------------------
 
64
 * PBMS DATABASES
 
65
 */
 
66
 
 
67
MSDatabase::MSDatabase():
 
68
myDatabasePath(NULL),
 
69
myDatabaseName(NULL),
 
70
myTempLogArray(NULL),
 
71
myCompactorThread(NULL),
 
72
myTempLogThread(NULL),
 
73
myRepostoryList(NULL),
 
74
myBlobCloud(NULL),
 
75
myBlobType(MS_STANDARD_STORAGE),
 
76
iTableList(NULL),
 
77
iTableArray(NULL),
 
78
iMaxTableID(0),
 
79
#ifdef HAVE_ALIAS_SUPPORT
 
80
iBlobAliases(NULL),
 
81
#endif
 
82
iDropping(false),
 
83
iRecovering(false),
 
84
isBackup(false),
 
85
iWriteTempLog(NULL),
 
86
iBackupThread(NULL),
 
87
iNextBlobRefId(0),
 
88
iClosing(false)
 
89
{
 
90
}
 
91
 
 
92
MSDatabase::~MSDatabase()
 
93
{
 
94
        iClosing = true;
 
95
        if (iBackupThread) {
 
96
                iBackupThread->stop();
 
97
                iBackupThread->release();
 
98
                iBackupThread = NULL;
 
99
        }
 
100
        
 
101
        if (myTempLogThread) {
 
102
                myTempLogThread->stop();
 
103
                myTempLogThread->release();
 
104
                myTempLogThread = NULL;
 
105
        }
 
106
        if (myCompactorThread) {
 
107
                myRepostoryList->wakeup(); // The compator thread waits on this.
 
108
                myCompactorThread->stop();
 
109
                myCompactorThread->release();
 
110
                myCompactorThread = NULL;
 
111
        }
 
112
        
 
113
        if (myDatabasePath)
 
114
                myDatabasePath->release();
 
115
                
 
116
        if (myDatabaseName)
 
117
                myDatabaseName->release();
 
118
                
 
119
        iWriteTempLog = NULL;
 
120
        if (myTempLogArray) {
 
121
                myTempLogArray->clear();
 
122
                myTempLogArray->release();
 
123
        }
 
124
        if (iTableList) {
 
125
                iTableList->clear();
 
126
                iTableList->release();
 
127
        }
 
128
        if (iTableArray){
 
129
                iTableArray->clear();
 
130
                iTableArray->release();
 
131
        }
 
132
        if (myRepostoryList) {
 
133
                myRepostoryList->clear();
 
134
                myRepostoryList->release();
 
135
        }
 
136
#ifdef HAVE_ALIAS_SUPPORT
 
137
        if (iBlobAliases) {
 
138
                iBlobAliases->ma_close();
 
139
                iBlobAliases->release();
 
140
        }
 
141
#endif
 
142
        if (myBlobCloud) {
 
143
                myBlobCloud->release();
 
144
        }
 
145
}
 
146
 
 
147
const char *MSDatabase::getDatabaseNameCString()
 
148
{
 
149
        return myDatabaseName->getCString();
 
150
}
 
151
 
 
152
MSTable *MSDatabase::getTable(CSString *tab_name, bool create)
 
153
{
 
154
        MSTable *tab;
 
155
        
 
156
        enter_();
 
157
        push_(tab_name);
 
158
        lock_(iTableList);
 
159
        if (!(tab = (MSTable *) iTableList->find(tab_name))) {
 
160
 
 
161
                if (create) {
 
162
                        /* Create a new table: */
 
163
                        tab = MSTable::newTable(iMaxTableID+1, RETAIN(tab_name), this, (off_t) 0, false);
 
164
                        iTableList->add(tab);
 
165
                        iTableArray->set(iMaxTableID+1, RETAIN(tab));
 
166
                        iMaxTableID++;
 
167
                }
 
168
        }
 
169
        if (tab)
 
170
                tab->retain();
 
171
        unlock_(iTableList);
 
172
        release_(tab_name);
 
173
        return_(tab);
 
174
}
 
175
 
 
176
MSTable *MSDatabase::getTable(const char *tab_name, bool create)
 
177
{
 
178
        return getTable(CSString::newString(tab_name), create);
 
179
}
 
180
 
 
181
 
 
182
MSTable *MSDatabase::getTable(uint32_t tab_id, bool missing_ok)
 
183
{
 
184
        MSTable *tab;
 
185
        
 
186
        enter_();
 
187
        lock_(iTableList);
 
188
        if (!(tab = (MSTable *) iTableArray->get((u_int) tab_id))) {
 
189
                if (missing_ok) {
 
190
                        unlock_(iTableList);
 
191
                        return_(NULL);
 
192
                }
 
193
                char buffer[CS_EXC_MESSAGE_SIZE];
 
194
 
 
195
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown table #");
 
196
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (u_int) tab_id);
 
197
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, " in database ");
 
198
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, getDatabaseNameCString());
 
199
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_TABLE, buffer);
 
200
        }
 
201
        tab->retain();
 
202
        unlock_(iTableList);
 
203
        return_(tab);
 
204
}
 
205
 
 
206
MSTable *MSDatabase::getNextTable(uint32_t *pos)
 
207
{
 
208
        u_int i = *pos;
 
209
        MSTable *tab = NULL;
 
210
        
 
211
        enter_();
 
212
        lock_(iTableList);
 
213
        while (i < iTableList->getSize()) {
 
214
                tab = (MSTable *) iTableList->itemAt(i++);
 
215
                if (!tab->isToDelete())
 
216
                        break;
 
217
                tab = NULL;
 
218
        }
 
219
        if (tab)
 
220
                tab->retain();
 
221
        unlock_(iTableList);
 
222
        *pos = i;
 
223
        return_(tab);
 
224
}
 
225
 
 
226
void MSDatabase::addTable(uint32_t tab_id, const char *tab_name, off_t file_size, bool to_delete)
 
227
{
 
228
        MSTable *tab;
 
229
 
 
230
        if (tab_id > iMaxTableID)
 
231
                iMaxTableID = tab_id;
 
232
        tab = MSTable::newTable(tab_id, tab_name, this, file_size, to_delete);
 
233
        iTableList->add(tab);
 
234
        iTableArray->set(tab_id, RETAIN(tab));
 
235
}
 
236
 
 
237
void MSDatabase::addTableFromFile(CSDirectory *dir, const char *file_name, bool to_delete)
 
238
{
 
239
        off_t   file_size;
 
240
        u_int   file_id;
 
241
        char    tab_name[MS_TABLE_NAME_SIZE];
 
242
 
 
243
        dir->info(NULL, &file_size, NULL);
 
244
        file_id = ms_file_to_table_id(file_name);
 
245
        ms_file_to_table_name(MS_TABLE_NAME_SIZE, tab_name, file_name);
 
246
        addTable(file_id, tab_name, file_size, to_delete);
 
247
}
 
248
 
 
249
void MSDatabase::removeTable(MSTable *tab)
 
250
{
 
251
        enter_();
 
252
        push_(tab);
 
253
        lock_(iTableList);
 
254
        iTableList->remove(tab->myTableName);
 
255
        iTableArray->remove(tab->myTableID);
 
256
        unlock_(iTableList);
 
257
        release_(tab);
 
258
        exit_();
 
259
}
 
260
 
 
261
void MSDatabase::dropTable(MSTable *tab)
 
262
{
 
263
        enter_();
 
264
        push_(tab);
 
265
        lock_(iTableList);
 
266
        iTableList->remove(tab->myTableName);
 
267
        iTableArray->remove(tab->myTableID);
 
268
 
 
269
        // Cute: you drop the table by adding it with the 'to_delete' flag set to 'true'
 
270
        addTable(tab->myTableID, tab->myTableName->getCString(), tab->getTableFileSize(), true);
 
271
 
 
272
        unlock_(iTableList);
 
273
        release_(tab);
 
274
        exit_();
 
275
}
 
276
 
 
277
// This function is used when dropping tables from a database before
 
278
// dropping the database itself. 
 
279
CSString *MSDatabase::getATableName()
 
280
{
 
281
        u_int i = 0;
 
282
        MSTable *tab;
 
283
        CSString *name = NULL;
 
284
        
 
285
        enter_();
 
286
        lock_(iTableList);
 
287
 
 
288
        while ((tab = (MSTable *) iTableList->itemAt(i++)) && tab->isToDelete()) ;
 
289
        if (tab) {
 
290
                name = tab->getTableName();
 
291
                name->retain();
 
292
        }
 
293
        unlock_(iTableList);
 
294
        return_(name);
 
295
}
 
296
 
 
297
u_int MSDatabase::getTableCount()
 
298
{
 
299
        u_int cnt = 0, i = 0;
 
300
        MSTable *tab;
 
301
        
 
302
        enter_();
 
303
        lock_(iTableList);
 
304
 
 
305
        while ((tab = (MSTable *) iTableList->itemAt(i++))) {
 
306
                if (!tab->isToDelete())
 
307
                        cnt++;
 
308
        }
 
309
 
 
310
        unlock_(iTableList);
 
311
        return_(cnt);
 
312
}
 
313
 
 
314
 
 
315
void MSDatabase::renameTable(MSTable *tab, const char *to_name)
 
316
{
 
317
        enter_();
 
318
        lock_(iTableList);
 
319
        iTableList->remove(tab->myTableName);
 
320
        iTableArray->remove(tab->myTableID);
 
321
 
 
322
        addTable(tab->myTableID, to_name, tab->getTableFileSize(), false);
 
323
 
 
324
        unlock_(iTableList);
 
325
        exit_();
 
326
}
 
327
 
 
328
void MSDatabase::openWriteRepo(MSOpenTable *otab)
 
329
{
 
330
        if (otab->myWriteRepo && otab->myWriteRepoFile)
 
331
                return;
 
332
 
 
333
        enter_();
 
334
        if (!otab->myWriteRepo)
 
335
                otab->myWriteRepo = lockRepo(0);
 
336
 
 
337
        /* Now open the repo file for the open table: */
 
338
        otab->myWriteRepo->openRepoFileForWriting(otab);
 
339
        exit_();
 
340
}
 
341
 
 
342
MSRepository *MSDatabase::getRepoFullOfTrash(time_t *ret_wait_time)
 
343
{
 
344
        MSRepository    *repo = NULL;
 
345
        time_t                  wait_time = 0;
 
346
        
 
347
        if (ret_wait_time)
 
348
                wait_time = *ret_wait_time;
 
349
        enter_();
 
350
        lock_(myRepostoryList);
 
351
        for (u_int i=0; i<myRepostoryList->size(); i++) {
 
352
                retry:
 
353
                if ((repo = (MSRepository *) myRepostoryList->get(i))) {
 
354
                        if (!repo->isRemovingFP && !repo->mustBeDeleted && !repo->isRepoLocked()) {
 
355
                                if (!repo->myRepoHeadSize) {
 
356
                                        /* The file has not yet been opened, so the
 
357
                                         * garbage count will not be known!
 
358
                                         */
 
359
                                        MSRepoFile *repo_file;
 
360
 
 
361
                                        repo->retain();
 
362
                                        unlock_(myRepostoryList);
 
363
                                        push_(repo);
 
364
                                        repo_file = repo->openRepoFile();
 
365
                                        repo_file->release();
 
366
                                        release_(repo);
 
367
                                        lock_(myRepostoryList);
 
368
                                        goto retry;
 
369
                                }
 
370
                                if (repo->getGarbageLevel() >= MSRepository::gGarbageThreshold) {
 
371
                                        /* Make sure there are not temp BLOBs in this repository that have
 
372
                                         * not yet timed out:
 
373
                                         */
 
374
                                        time_t now = time(NULL);
 
375
                                        time_t then = repo->myLastTempTime;
 
376
 
 
377
                                        /* Check if there are any temp BLOBs to be removed: */
 
378
                                        if (now > then + MSTempLog::gTempBlobTimeout) {
 
379
                                                repo->lockRepo(REPO_COMPACTING); 
 
380
                                                repo->retain();
 
381
                                                break;
 
382
                                        }
 
383
                                        else {
 
384
                                                /* There are temp BLOBs to wait for... */
 
385
                                                if (!wait_time || wait_time > MSTempLog::adjustWaitTime(then, now))
 
386
                                                        wait_time = MSTempLog::adjustWaitTime(then, now);
 
387
                                        }
 
388
                                }
 
389
                        }
 
390
                        repo = NULL;
 
391
                }
 
392
        }
 
393
        unlock_(myRepostoryList);
 
394
        if (ret_wait_time)
 
395
                *ret_wait_time = wait_time;
 
396
        return_(repo);
 
397
}
 
398
 
 
399
MSRepository *MSDatabase::lockRepo(off_t size)
 
400
{
 
401
        MSRepository    *repo;
 
402
        u_int                   free_slot;
 
403
 
 
404
        enter_();
 
405
        lock_(myRepostoryList);
 
406
        free_slot = myRepostoryList->size();
 
407
        /* Find an unlocked repository file that is below the write threshold: */
 
408
        for (u_int i=0; i<myRepostoryList->size(); i++) {
 
409
                if ((repo = (MSRepository *) myRepostoryList->get(i))) {
 
410
                        if (!repo->isRepoLocked() && !repo->isRemovingFP && !repo->mustBeDeleted &&
 
411
                                repo->myRepoFileSize + size < gRepoThreshold 
 
412
                                /**/ && repo->getGarbageLevel() < MSRepository::gGarbageThreshold)
 
413
                                goto found1;
 
414
                }
 
415
                else {
 
416
                        if (i < free_slot)
 
417
                                free_slot = i;
 
418
                }
 
419
        }
 
420
 
 
421
        /* None found, create a new repo file: */
 
422
        new_(repo, MSRepository(free_slot + 1, this, 0));
 
423
        myRepostoryList->set(free_slot, repo);
 
424
 
 
425
        found1:
 
426
        repo->retain();
 
427
        repo->lockRepo(REPO_WRITE);  // <- The MSRepository::backToPool() will unlock this.
 
428
        unlock_(myRepostoryList);
 
429
        return_(repo);
 
430
}
 
431
 
 
432
MSRepoFile *MSDatabase::getRepoFileFromPool(uint32_t repo_id, bool missing_ok)
 
433
{
 
434
        MSRepository    *repo;
 
435
        MSRepoFile              *file;
 
436
 
 
437
        enter_();
 
438
        lock_(myRepostoryList);
 
439
        if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1))) {
 
440
                if (!missing_ok) {
 
441
                        char buffer[CS_EXC_MESSAGE_SIZE];
 
442
 
 
443
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown repository file: ");
 
444
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (u_int) repo_id);
 
445
                        CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, buffer);
 
446
                }
 
447
                unlock_(myRepostoryList);
 
448
                return_(NULL);
 
449
        }
 
450
        if (repo->isRemovingFP) {
 
451
                char buffer[CS_EXC_MESSAGE_SIZE];
 
452
 
 
453
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Repository will be removed: ");
 
454
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (u_int) repo_id);
 
455
                CSException::throwException(CS_CONTEXT, MS_ERR_REMOVING_REPO, buffer);
 
456
        }
 
457
        repo->retain(); /* Release is here: [++] */
 
458
        file = repo->getRepoFile();
 
459
        unlock_(myRepostoryList);
 
460
 
 
461
        if (!file) {
 
462
                file = repo->openRepoFile();
 
463
                lock_(myRepostoryList);
 
464
                repo->addRepoFile(file);
 
465
                file->retain();
 
466
                unlock_(myRepostoryList);
 
467
        }
 
468
        return_(file);
 
469
}
 
470
 
 
471
void MSDatabase::returnRepoFileToPool(MSRepoFile *file)
 
472
{
 
473
        MSRepository    *repo;
 
474
 
 
475
        enter_();
 
476
        lock_(myRepostoryList);
 
477
        push_(file);
 
478
        if ((repo = file->myRepo)) {
 
479
                if (repo->isRemovingFP) {
 
480
                        repo->removeRepoFile(file);
 
481
                        myRepostoryList->wakeup();
 
482
                }
 
483
                else
 
484
                        repo->returnRepoFile(file);
 
485
                repo->release(); /* [++] here is the release.  */
 
486
        }
 
487
        release_(file);
 
488
        unlock_(myRepostoryList);
 
489
        exit_();
 
490
}
 
491
 
 
492
void MSDatabase::removeRepo(uint32_t repo_id, bool *mustQuit)
 
493
{
 
494
        MSRepository *repo;
 
495
 
 
496
        enter_();
 
497
        lock_(myRepostoryList);
 
498
        while ((!mustQuit || !*mustQuit) && !iClosing) {
 
499
                if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1)))
 
500
                        break;
 
501
                repo->isRemovingFP = true;
 
502
                if (repo->removeRepoFilesNotInUse()) {
 
503
                        myRepostoryList->set(repo_id - 1, NULL);
 
504
                        break;
 
505
                }
 
506
                /*
 
507
                 * Wait for the files that are in use to be
 
508
                 * freed.
 
509
                 */
 
510
                myRepostoryList->wait();
 
511
        }
 
512
        unlock_(myRepostoryList);
 
513
        exit_();
 
514
}
 
515
 
 
516
void MSDatabase::queueTempLogEvent(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
 
517
         uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time)
 
518
{
 
519
        MSTempLogItemRec        item;
 
520
        uint32_t                                timev;
 
521
 
 
522
        // Each otab object holds an handle to an instance of an OPEN
 
523
        // temp log. This is so that each thread has it's own open temp log 
 
524
        // and doesn't need to be opened and close it constantly.
 
525
        
 
526
        enter_();
 
527
        lock_(myTempLogArray);
 
528
        if (!iWriteTempLog) {
 
529
                iWriteTempLog = (MSTempLog *) myTempLogArray->last();
 
530
                if (!iWriteTempLog) {
 
531
                        new_(iWriteTempLog, MSTempLog(1, this, 0));
 
532
                        myTempLogArray->set(1, iWriteTempLog);
 
533
                }
 
534
        }
 
535
        if (!otab->myTempLogFile)
 
536
                otab->myTempLogFile = iWriteTempLog->openTempLog();
 
537
        else if (otab->myTempLogFile->myTempLogID != iWriteTempLog->myLogID) {
 
538
                otab->myTempLogFile->release();
 
539
                otab->myTempLogFile = NULL;
 
540
                otab->myTempLogFile = iWriteTempLog->openTempLog();
 
541
        }
 
542
 
 
543
        if (iWriteTempLog->myTempLogSize >= gTempLogThreshold) {
 
544
                u_int log_id = iWriteTempLog->myLogID + 1;
 
545
 
 
546
                new_(iWriteTempLog, MSTempLog(log_id, this, 0));
 
547
                myTempLogArray->set(log_id, iWriteTempLog);
 
548
 
 
549
                otab->myTempLogFile->release();
 
550
                otab->myTempLogFile = NULL;
 
551
                otab->myTempLogFile = iWriteTempLog->openTempLog();
 
552
 
 
553
        }
 
554
 
 
555
        timev = time(NULL);
 
556
        *log_id = iWriteTempLog->myLogID;
 
557
        *log_offset = (uint32_t) iWriteTempLog->myTempLogSize;
 
558
        if (q_time)
 
559
                *q_time = timev;
 
560
        iWriteTempLog->myTempLogSize += iWriteTempLog->myTemplogRecSize;
 
561
        unlock_(myTempLogArray);
 
562
 
 
563
        CS_SET_DISK_1(item.ti_type_1, type);
 
564
        CS_SET_DISK_4(item.ti_table_id_4, tab_id);
 
565
        CS_SET_DISK_6(item.ti_blob_id_6, blob_id);
 
566
        CS_SET_DISK_4(item.ti_auth_code_4, auth_code);
 
567
        CS_SET_DISK_4(item.ti_time_4, timev);
 
568
        otab->myTempLogFile->write(&item, *log_offset, sizeof(MSTempLogItemRec));
 
569
        
 
570
        
 
571
        exit_();
 
572
}
 
573
 
 
574
#ifdef HAVE_ALIAS_SUPPORT
 
575
void MSDatabase::queueForDeletion(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
 
576
         uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time, MSDiskAliasPtr aliasDiskRec)
 
577
{
 
578
        enter_();
 
579
        
 
580
        queueTempLogEvent(otab, type, tab_id, blob_id, auth_code, log_id, log_offset, q_time);
 
581
                
 
582
        // If it has an alias remove it from the ailias index.
 
583
        if (aliasDiskRec) {
 
584
                try_(a) {
 
585
                        deleteBlobAlias(aliasDiskRec);
 
586
                }
 
587
                catch_(a);
 
588
                self->logException();
 
589
                cont_(a);
 
590
        }
 
591
        
 
592
        exit_();
 
593
}
 
594
#endif
 
595
 
 
596
MSTempLogFile *MSDatabase::openTempLogFile(uint32_t log_id, size_t *log_rec_size, size_t *log_head_size)
 
597
{
 
598
        MSTempLog               *log;
 
599
        MSTempLogFile   *log_file = NULL;
 
600
 
 
601
        enter_();
 
602
        lock_(myTempLogArray);
 
603
        if (log_id)
 
604
                log = (MSTempLog *) myTempLogArray->get(log_id);
 
605
        else
 
606
                log = (MSTempLog *) myTempLogArray->first();
 
607
        if (log) {
 
608
                log_file = log->openTempLog();
 
609
                if (log_rec_size)
 
610
                        *log_rec_size = log->myTemplogRecSize;
 
611
                if (log_head_size)
 
612
                        *log_head_size = log->myTempLogHeadSize;
 
613
        }
 
614
        unlock_(myTempLogArray);
 
615
        return_(log_file);
 
616
}
 
617
 
 
618
u_int MSDatabase::getTempLogCount()
 
619
{
 
620
        u_int count;
 
621
 
 
622
        enter_();
 
623
        lock_(myTempLogArray);
 
624
        count = myTempLogArray->size();
 
625
        unlock_(myTempLogArray);
 
626
        return_(count);
 
627
}
 
628
 
 
629
void MSDatabase::removeTempLog(uint32_t log_id)
 
630
{
 
631
        enter_();
 
632
        lock_(myTempLogArray);
 
633
        myTempLogArray->remove(log_id);
 
634
        unlock_(myTempLogArray);
 
635
        exit_();
 
636
}
 
637
 
 
638
CSObject *MSDatabase::getKey()
 
639
{
 
640
        return (CSObject *) myDatabaseName;
 
641
}
 
642
 
 
643
int MSDatabase::compareKey(CSObject *key)
 
644
{
 
645
        return myDatabaseName->compare((CSString *) key);
 
646
}
 
647
 
 
648
MSCompactorThread *MSDatabase::getCompactorThread()
 
649
{
 
650
        return myCompactorThread;
 
651
}
 
652
 
 
653
CSSyncVector *MSDatabase::getRepositoryList()
 
654
{       
 
655
        return myRepostoryList;
 
656
}
 
657
 
 
658
#ifdef HAVE_ALIAS_SUPPORT
 
659
uint32_t MSDatabase::registerBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
 
660
{
 
661
        uint32_t hash;
 
662
        bool can_retry = true;
 
663
        enter_();
 
664
        
 
665
retry:
 
666
        lock_(&iBlobAliaseLock);
 
667
        
 
668
        try_(a) {
 
669
                hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
 
670
        }
 
671
        
 
672
        catch_(a) {
 
673
                unlock_(&iBlobAliaseLock);
 
674
                if (can_retry) {
 
675
                        // It can be that a duplicater alias exists that was deleted
 
676
                        // but the transaction has not been written to the repository yet.
 
677
                        // Flush all committed transactions to the repository file.
 
678
                        MSTransactionManager::flush();
 
679
                        can_retry = false;
 
680
                        goto retry;
 
681
                }
 
682
                throw_();
 
683
        }
 
684
        
 
685
        cont_(a);
 
686
        unlock_(&iBlobAliaseLock);
 
687
        return_(hash);
 
688
}
 
689
 
 
690
uint32_t MSDatabase::updateBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t old_alias_hash, const char *alias)
 
691
{
 
692
        uint32_t new_hash;
 
693
        enter_();
 
694
        lock_(&iBlobAliaseLock);
 
695
        
 
696
        new_hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
 
697
        iBlobAliases->deleteAlias(repo_id, repo_offset, old_alias_hash);
 
698
 
 
699
        unlock_(&iBlobAliaseLock);
 
700
        return_(new_hash);
 
701
}
 
702
 
 
703
void MSDatabase::deleteBlobAlias(MSDiskAliasPtr diskRec)
 
704
{
 
705
        enter_();
 
706
        lock_(&iBlobAliaseLock);
 
707
        iBlobAliases->deleteAlias(diskRec);
 
708
        unlock_(&iBlobAliaseLock);
 
709
        exit_();
 
710
}
 
711
 
 
712
void MSDatabase::deleteBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
 
713
{
 
714
        MSDiskAliasRec diskRec;
 
715
        
 
716
        CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);   
 
717
        CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);        
 
718
        CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);
 
719
        deleteBlobAlias(&diskRec);
 
720
}
 
721
 
 
722
void MSDatabase::moveBlobAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
 
723
{
 
724
        enter_();
 
725
        lock_(&iBlobAliaseLock);
 
726
        iBlobAliases->resetAlias(old_repo_id, old_repo_offset, alias_hash, new_repo_id, new_repo_offset);
 
727
        unlock_(&iBlobAliaseLock);
 
728
        exit_();
 
729
}
 
730
#endif
 
731
 
 
732
bool MSDatabase::isValidHeaderField(const char *name)
 
733
{
 
734
        bool is_valid = false;
 
735
        CSString                *header;
 
736
        enter_();
 
737
 
 
738
        if (name && *name) {
 
739
                if (strcasecmp(name, MS_ALIAS_TAG)) {
 
740
                        lock_(&iHTTPMetaDataHeaders);
 
741
                        header = CSString::newString(name);
 
742
                        push_(header);
 
743
                                
 
744
                        is_valid = (iHTTPMetaDataHeaders.find(header) != NULL);
 
745
                        release_(header);
 
746
                        
 
747
                        unlock_(&iHTTPMetaDataHeaders);
 
748
                } else 
 
749
                        is_valid = true;
 
750
        }
 
751
        
 
752
        return_(is_valid);
 
753
}
 
754
 
 
755
void MSDatabase::startUp(const char *default_http_headers)
 
756
{
 
757
        enter_();
 
758
        
 
759
        new_(gDatabaseList, CSSyncSortedList);
 
760
        new_(gDatabaseArray, CSSparseArray(5));
 
761
        MSHTTPHeaderTable::setDefaultMetaDataHeaders(default_http_headers);
 
762
        pbmsSystemTablesStartUp();
 
763
        gBackupDatabaseID = 1;
 
764
        exit_();
 
765
}
 
766
 
 
767
void MSDatabase::stopThreads()
 
768
{
 
769
        MSDatabase *db;
 
770
 
 
771
        enter_();
 
772
        if (gDatabaseList) {
 
773
                lock_(gDatabaseList);
 
774
                for (int i=0;;i++) {
 
775
                        if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
 
776
                                break;
 
777
                        db->iClosing = true;
 
778
                        
 
779
                        if (db->myTempLogThread) {
 
780
                                db->myTempLogThread->stop();
 
781
                                db->myTempLogThread->release();
 
782
                                db->myTempLogThread = NULL;
 
783
                        }
 
784
                        if (db->myCompactorThread) {
 
785
                                db->myRepostoryList->wakeup(); // The compator thread waits on this.
 
786
                                db->myCompactorThread->stop();
 
787
                                db->myCompactorThread->release();
 
788
                                db->myCompactorThread = NULL;
 
789
                        }
 
790
                        
 
791
                        if (db->iBackupThread) {
 
792
                                db->iBackupThread->stop();
 
793
                                db->iBackupThread->release();
 
794
                                db->iBackupThread = NULL;
 
795
                        }
 
796
        
 
797
                }
 
798
                
 
799
                unlock_(gDatabaseList);
 
800
        }
 
801
        exit_();
 
802
}
 
803
 
 
804
void MSDatabase::shutDown()
 
805
{
 
806
                
 
807
        if (gDatabaseArray) {
 
808
                gDatabaseArray->clear();
 
809
                gDatabaseArray->release();
 
810
                gDatabaseArray = NULL;
 
811
        }
 
812
        
 
813
        if (gDatabaseList) {
 
814
                gDatabaseList->clear();
 
815
                gDatabaseList->release();
 
816
                gDatabaseList = NULL;
 
817
        }
 
818
        
 
819
        MSHTTPHeaderTable::releaseDefaultMetaDataHeaders();
 
820
        pbmsSystemTableShutDown();
 
821
}
 
822
 
 
823
void MSDatabase::setBackupDatabase()
 
824
{
 
825
        enter_();
 
826
        // I need to give the backup database a unique fake database ID.
 
827
        // This is so that it is not confused with the database being backed
 
828
        // backed up when opening tables.
 
829
        
 
830
        // Normally database IDs are generated by time(NULL) so small database IDs
 
831
        // are safe to use as fake IDs.
 
832
         
 
833
        lock_(gDatabaseList);
 
834
        myDatabaseID = gBackupDatabaseID++;
 
835
        gDatabaseArray->set(myDatabaseID, RETAIN(this));
 
836
        isBackup = true;
 
837
        
 
838
        // Notify the cloud storage, if any, that it is a backup.
 
839
        // This is important because if the backup database is dropped
 
840
        // we need to be sure that only the BLOBs belonging to the
 
841
        // backup are removed from the cloud.
 
842
        myBlobCloud->cl_setCloudIsBackup(); 
 
843
        
 
844
        unlock_(gDatabaseList);
 
845
        
 
846
        // Rename the database path so that it is obviouse that this is an incomplete backup database.
 
847
        // When the backup is completed it will be renamed back.
 
848
        CSPath *new_path = CSPath::newPath(myDatabasePath->concat("#"));
 
849
        push_(new_path);
 
850
        
 
851
        if (new_path->exists()) 
 
852
                new_path->remove();
 
853
        
 
854
        CSPath *db_path = CSPath::newPath(RETAIN(myDatabasePath));
 
855
        push_(db_path);
 
856
        
 
857
        db_path->rename(new_path->getNameCString());
 
858
        myDatabasePath->release();
 
859
        myDatabasePath = new_path->getString();
 
860
        myDatabasePath->retain();
 
861
        
 
862
        release_(db_path);
 
863
        release_(new_path);
 
864
        
 
865
        
 
866
        exit_();
 
867
}
 
868
 
 
869
void MSDatabase::releaseBackupDatabase()
 
870
{
 
871
        enter_();
 
872
 
 
873
        
 
874
        // The backup has completed succefully, rename the path to the correct name.
 
875
        CSPath *db_path = CSPath::newPath(myDatabasePath->getCString());
 
876
        push_(db_path);
 
877
        
 
878
        myDatabasePath->setLength(myDatabasePath->length()-1);
 
879
        db_path->rename(cs_last_name_of_path(myDatabasePath->getCString()));
 
880
        release_(db_path);
 
881
        
 
882
        // Remove the backup database object.
 
883
        lock_(gDatabaseList);
 
884
        gDatabaseArray->remove(myDatabaseID);
 
885
        MSTableList::removeDatabaseTables(this); // Will also release the database object.
 
886
        unlock_(gDatabaseList);
 
887
        
 
888
        
 
889
        exit_();
 
890
}
 
891
 
 
892
void MSDatabase::startBackup(MSBackupInfo *backup_info)
 
893
{
 
894
        enter_();
 
895
 
 
896
        push_(backup_info);
 
897
        if (iBackupThread) {
 
898
                if (iBackupThread->isRunning()) {
 
899
                        CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, "A backup is still running.");
 
900
                }
 
901
                iBackupThread->release();
 
902
                iBackupThread = NULL;
 
903
        }
 
904
        
 
905
        pop_(backup_info);
 
906
        iBackupThread = MSBackup::newMSBackup(backup_info);
 
907
        
 
908
        try_(a) {
 
909
                iBackupThread->startBackup(RETAIN(this));
 
910
        }
 
911
        
 
912
        catch_(a) {
 
913
                iBackupThread->release();
 
914
                iBackupThread = NULL;
 
915
                throw_();
 
916
        }
 
917
        cont_(a);
 
918
        
 
919
        exit_();
 
920
}
 
921
 
 
922
bool MSDatabase::backupStatus(uint64_t *total, uint64_t *completed, bool *completed_ok)
 
923
{
 
924
        bool done;
 
925
        
 
926
        enter_();
 
927
        
 
928
        if (iBackupThread) {
 
929
                *total = iBackupThread->getBackupSize();
 
930
                *completed = iBackupThread->getBackupCompletedSize();
 
931
                done = !iBackupThread->isRunning();
 
932
                *completed = (iBackupThread->getStatus() == 0);
 
933
        } else {
 
934
                *completed_ok = done = true;
 
935
                *total = *completed = 0;                        
 
936
        }
 
937
                
 
938
        return_(done);
 
939
}
 
940
 
 
941
uint32_t MSDatabase::backupID()
 
942
 
943
        return (iBackupThread)?iBackupThread->backupID(): 0;
 
944
}
 
945
 
 
946
void MSDatabase::terminateBackup()
 
947
{
 
948
        if (iBackupThread) {
 
949
                iBackupThread->stop();
 
950
                iBackupThread->release();
 
951
                iBackupThread = NULL;
 
952
        }
 
953
}
 
954
 
 
955
MSDatabase *MSDatabase::getDatabase(CSString *db_name, bool create)
 
956
{
 
957
        MSDatabase *db;
 
958
        enter_();
 
959
        push_(db_name);
 
960
        
 
961
        
 
962
        lock_(gDatabaseList);
 
963
        if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
 
964
                db = MSDatabase::loadDatabase(RETAIN(db_name), create);
 
965
                if (!db)
 
966
                        goto exit;
 
967
        } else
 
968
                db->retain();
 
969
        
 
970
        exit:
 
971
        unlock_(gDatabaseList);
 
972
        release_(db_name);
 
973
        return_(db);
 
974
}
 
975
 
 
976
MSDatabase *MSDatabase::getDatabase(const char *db_name, bool create)
 
977
{
 
978
        return getDatabase(CSString::newString(db_name), create);
 
979
}
 
980
 
 
981
MSDatabase *MSDatabase::getDatabase(uint32_t db_id)
 
982
{
 
983
        MSDatabase *db;
 
984
        
 
985
        enter_();
 
986
        lock_(gDatabaseList);
 
987
        if ((db = (MSDatabase *) gDatabaseArray->get((u_int) db_id))) 
 
988
                db->retain();
 
989
        else {
 
990
                // Look for the database folder with the correct ID:
 
991
                CSPath *path = CSPath::newPath(ms_my_get_mysql_home_path(), "pbms");
 
992
                push_(path);
 
993
                if (path->exists()) {
 
994
                        CSDirectory *dir;
 
995
                        dir = CSDirectory::newDirectory(RETAIN(path));
 
996
                        push_(dir);
 
997
                        dir->open();
 
998
                        
 
999
                        while (dir->next() && !db) {
 
1000
                                if (!dir->isFile()) {
 
1001
                                        const char *ptr, *dir_name  = dir->name();
 
1002
                                        ptr = dir_name + strlen(dir_name) -1;
 
1003
                                        
 
1004
                                        while (ptr > dir_name && *ptr != '-') ptr--;
 
1005
                                        
 
1006
                                        if (*ptr ==  '-') {
 
1007
                                                int len = ptr - dir_name;
 
1008
                                                ptr++;
 
1009
                                                if (atol(ptr) == db_id && len) {
 
1010
                                                        db = getDatabase(CSCString::newString(dir_name, len), true);
 
1011
                                                        ASSERT(db->myDatabaseID == db_id);
 
1012
                                                }
 
1013
                                        }
 
1014
                                }
 
1015
                        }
 
1016
                        release_(dir);
 
1017
                }
 
1018
                release_(path);         
 
1019
        }
 
1020
        unlock_(gDatabaseList);
 
1021
        
 
1022
        if (!db) {
 
1023
                char buffer[CS_EXC_MESSAGE_SIZE];
 
1024
 
 
1025
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database #");
 
1026
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (u_int) db_id);
 
1027
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
 
1028
        }
 
1029
        return_(db);
 
1030
}
 
1031
 
 
1032
void MSDatabase::wakeTempLogThreads()
 
1033
{
 
1034
        MSDatabase *db;
 
1035
 
 
1036
        if (!gDatabaseList)
 
1037
                return;
 
1038
        enter_();
 
1039
        lock_(gDatabaseList);
 
1040
        for (int i=0;;i++) {
 
1041
                if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
 
1042
                        break;
 
1043
                if (db->myTempLogThread)
 
1044
                        db->myTempLogThread->wakeup();
 
1045
        }
 
1046
        unlock_(gDatabaseList);
 
1047
        exit_();
 
1048
}
 
1049
 
 
1050
uint32_t MSDatabase::getDBID(CSPath *path, CSString *db_name)
 
1051
{
 
1052
        CSDirectory             *dir;
 
1053
        uint32_t                        db_id = 0;
 
1054
        int                             len = db_name->length();
 
1055
        const char              *ptr;
 
1056
        
 
1057
        enter_();
 
1058
        push_(db_name);
 
1059
        push_(path);
 
1060
        
 
1061
        // Search for the ID of the database
 
1062
        dir = CSDirectory::newDirectory(RETAIN(path));
 
1063
        push_(dir);
 
1064
        dir->open();
 
1065
        while (dir->next() && !db_id) 
 
1066
                {
 
1067
                if (!dir->isFile()){
 
1068
                        ptr = dir->name() + strlen(dir->name()) -1;
 
1069
                        while (ptr > dir->name() && isdigit(*ptr)) ptr--;
 
1070
                        if ((*ptr == '-') && (len == (ptr - dir->name())) && !db_name->compare(dir->name(), len) ) {
 
1071
                                db_id = atol(ptr+1);                            
 
1072
                        }
 
1073
                }
 
1074
        }
 
1075
        release_(dir);
 
1076
        
 
1077
        if (!db_id) {
 
1078
                db_id = time(NULL);
 
1079
 
 
1080
                while (1) { // search for a unique db_id
 
1081
                        dir = CSDirectory::newDirectory(RETAIN(path));
 
1082
                        push_(dir);
 
1083
                        dir->open();
 
1084
                        while (db_id && dir->next()) {
 
1085
                                if (!dir->isFile()) {
 
1086
                                        ptr = dir->name() + strlen(dir->name()) -1;
 
1087
                                        while (ptr > dir->name() && isdigit(*ptr)) ptr--;
 
1088
                                        if ((*ptr == '-') && (db_id == atol(ptr+1))) {
 
1089
                                                db_id = 0;                              
 
1090
                                        }
 
1091
                                }
 
1092
                        }
 
1093
                        release_(dir);
 
1094
                        if (db_id)
 
1095
                                break;
 
1096
                        sleep(1); // Allow 1 second to pass.
 
1097
                        db_id = time(NULL);
 
1098
                } 
 
1099
        }
 
1100
        
 
1101
        release_(path);
 
1102
        release_(db_name);
 
1103
        return_(db_id);
 
1104
}
 
1105
 
 
1106
CSPath *MSDatabase::createDatabasePath(const char *location, CSString *db_name, uint32_t *db_id_ptr, bool *create, bool is_pbms)
 
1107
{
 
1108
        bool create_path = *create;     
 
1109
        CSPath *path = NULL;
 
1110
        char name_buffer[MS_DATABASE_NAME_SIZE + 40];
 
1111
        uint32_t db_id;
 
1112
        enter_();
 
1113
        
 
1114
        push_(db_name);
 
1115
        *create = false;
 
1116
        path = CSPath::newPath(location, "pbms");
 
1117
        push_(path);
 
1118
        if (!path->exists()) {
 
1119
                if (!create_path){
 
1120
                        release_(path);
 
1121
                        path = NULL;
 
1122
                        goto done;
 
1123
                }
 
1124
                        
 
1125
                *create = true;
 
1126
                path->makeDir();
 
1127
        }
 
1128
 
 
1129
        // If this is the pbms database then nothing more is to be done.
 
1130
        if (is_pbms)
 
1131
                goto done;
 
1132
        
 
1133
        if ((!db_id_ptr) || !*db_id_ptr) {
 
1134
                db_id = getDBID(RETAIN(path), RETAIN(db_name));
 
1135
                if (db_id_ptr)
 
1136
                        *db_id_ptr = db_id;
 
1137
        } else
 
1138
                db_id = *db_id_ptr;
 
1139
 
 
1140
        // Create the PBMS database name with ID
 
1141
        cs_strcpy(MS_DATABASE_NAME_SIZE + 40, name_buffer, db_name->getCString());
 
1142
        cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, "-");
 
1143
        cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, (u_int) db_id);
 
1144
                        
 
1145
        pop_(path);
 
1146
        path = CSPath::newPath(path, name_buffer);
 
1147
        push_(path);
 
1148
        if (!path->exists()) {
 
1149
                if (create_path) {
 
1150
                        *create = true;
 
1151
                        path->makeDir();
 
1152
                } else {
 
1153
                        release_(path);
 
1154
                        path = NULL;
 
1155
                }
 
1156
                        
 
1157
        }
 
1158
        
 
1159
done:
 
1160
        if (path)
 
1161
                pop_(path);
 
1162
        release_(db_name);
 
1163
        return_(path);
 
1164
}
 
1165
 
 
1166
 
 
1167
 
 
1168
MSDatabase *MSDatabase::newDatabase(const char *db_location, CSString *db_name, uint32_t        db_id, bool create)
 
1169
{
 
1170
        MSDatabase              *db = NULL;
 
1171
        CSDirectory             *dir;
 
1172
        MSRepository    *repo;
 
1173
        CSPath                  *path;
 
1174
        const char              *file_name;
 
1175
        u_int                   file_id;
 
1176
        off_t                   file_size;
 
1177
        MSTempLog               *log;
 
1178
        u_int                   to_delete = 0;
 
1179
        CSString                *db_path;
 
1180
        bool                    is_pbms = false;
 
1181
 
 
1182
        enter_();
 
1183
 
 
1184
        push_(db_name);
 
1185
 
 
1186
        //is_pbms = (strcmp(db_name->getCString(), "pbms") == 0); To be done later.
 
1187
        
 
1188
        /* Block the creation of the pbms database if there is no MySQL database. */
 
1189
        path = CSPath::newPath(ms_my_get_mysql_home_path(), RETAIN(db_name));
 
1190
        push_(path);
 
1191
        if (create && !path->exists()) {
 
1192
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
 
1193
        }
 
1194
        release_(path);
 
1195
        
 
1196
         // Create the database path, if 'create' == false then it can return NULL
 
1197
        path = createDatabasePath(db_location, RETAIN(db_name), &db_id, &create, is_pbms);
 
1198
        if (!path) {
 
1199
                release_(db_name);
 
1200
                return_(NULL);
 
1201
        }
 
1202
        push_(path);
 
1203
        
 
1204
        // Create the database object and initialize it.
 
1205
        if (!(db = new MSDatabase())) {
 
1206
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
 
1207
        }
 
1208
        
 
1209
        db->myIsPBMS = is_pbms;
 
1210
        db_path = path->getString();
 
1211
        db_path->retain();
 
1212
        release_(path);
 
1213
        path = NULL;
 
1214
                
 
1215
        db->iNextBlobRefId = (uint32_t) time(NULL);
 
1216
        db->iNextBlobRefId <<= 32;
 
1217
        db->iNextBlobRefId = COMMIT_MASK(db->iNextBlobRefId);
 
1218
        db->iNextBlobRefId++;
 
1219
        
 
1220
        db->myDatabaseID = db_id;
 
1221
        db->myDatabasePath = db_path;
 
1222
        db->myDatabaseName = db_name;
 
1223
        new_(db->myBlobCloud, CloudDB(db_id));
 
1224
 
 
1225
        pop_(db_name);  
 
1226
        
 
1227
        push_(db);
 
1228
        
 
1229
        
 
1230
        new_(db->myTempLogArray, CSSyncSparseArray(20));
 
1231
        new_(db->iTableList, CSSyncSortedList());
 
1232
        new_(db->iTableArray, CSSparseArray(20));
 
1233
        new_(db->myRepostoryList, CSSyncVector(20));
 
1234
        
 
1235
        if (!is_pbms) { //
 
1236
#ifdef HAVE_ALIAS_SUPPORT
 
1237
                //db->retain(); no retain here, MSAlias() takes a back ref.
 
1238
                new_(db->iBlobAliases, MSAlias(db));
 
1239
#endif          
 
1240
                /* "Load" the database: */
 
1241
 
 
1242
                /* Get the max table ID: */
 
1243
                dir = CSDirectory::newDirectory(RETAIN(db_path));
 
1244
                push_(dir);
 
1245
                dir->open();
 
1246
                while (dir->next()) {
 
1247
                        file_name = dir->name();
 
1248
                        if (dir->isFile() && cs_is_extension(file_name, "bst"))
 
1249
                                db->addTableFromFile(dir, file_name, false);
 
1250
                }
 
1251
                release_(dir);
 
1252
 
 
1253
                path = CSPath::newPath(RETAIN(db_path), "bs-repository");
 
1254
                if (path->exists()) {
 
1255
                        dir = CSDirectory::newDirectory(path);
 
1256
                        push_(dir);
 
1257
                        dir->open();
 
1258
                        while (dir->next()) {
 
1259
                                file_name = dir->name();
 
1260
                                if (dir->isFile() && cs_is_extension(file_name, "bs")) {
 
1261
                                        if ((file_id = ms_file_to_table_id(file_name, "repo"))) {
 
1262
                                                dir->info(NULL, &file_size, NULL);
 
1263
                                                new_(repo, MSRepository(file_id, db, file_size));
 
1264
                                                db->myRepostoryList->set(file_id - 1, repo);
 
1265
                                        }
 
1266
                                }
 
1267
                        }
 
1268
                        release_(dir);
 
1269
                }
 
1270
                else {
 
1271
                        path->makeDir();
 
1272
                        path->release();
 
1273
                }
 
1274
 
 
1275
                path = CSPath::newPath(RETAIN(db_path), "bs-logs");
 
1276
                if (path->exists()) {
 
1277
                        dir = CSDirectory::newDirectory(path);
 
1278
                        push_(dir);
 
1279
                        dir->open();
 
1280
                        while (dir->next()) {
 
1281
                                file_name = dir->name();
 
1282
                                if (dir->isFile()) {
 
1283
                                        if (cs_is_extension(file_name, "bs")) {
 
1284
                                                if ((file_id = ms_file_to_table_id(file_name, "temp"))) {
 
1285
                                                        dir->info(NULL, &file_size, NULL);
 
1286
                                                        new_(log, MSTempLog(file_id, db, file_size));
 
1287
                                                        db->myTempLogArray->set(file_id, log);
 
1288
                                                }
 
1289
                                        }
 
1290
                                        else if (cs_is_extension(file_name, "bst")) {
 
1291
                                                db->addTableFromFile(dir, file_name, true);
 
1292
                                                to_delete++;
 
1293
                                        }
 
1294
                                }
 
1295
                        }
 
1296
                        release_(dir);
 
1297
                }
 
1298
                else {
 
1299
                        path->makeDir();
 
1300
                        path->release();
 
1301
                }
 
1302
 
 
1303
                if (to_delete) {
 
1304
                        /* Go through and prepare all the tables that are to
 
1305
                         * be deleted:
 
1306
                         */
 
1307
                        u_int   i = 0;
 
1308
                        MSTable *tab;
 
1309
                        
 
1310
                        while ((tab = (MSTable *) db->iTableList->itemAt(i))) {
 
1311
                                if (tab->isToDelete())
 
1312
                                        tab->prepareToDelete();
 
1313
                                i++;
 
1314
                        }
 
1315
                }
 
1316
 
 
1317
        }
 
1318
        pop_(db);
 
1319
 
 
1320
        return_(db);
 
1321
}
 
1322
 
 
1323
void MSDatabase::startThreads()
 
1324
{
 
1325
        enter_();
 
1326
 
 
1327
        if (myIsPBMS)
 
1328
                exit_();
 
1329
                
 
1330
#ifdef HAVE_ALIAS_SUPPORT
 
1331
        // iBlobAliases->ma_open() must be called before starting any threads.
 
1332
        iBlobAliases->ma_open(); 
 
1333
#endif
 
1334
 
 
1335
        new_(myTempLogThread, MSTempLogThread(1 * 1000, this));
 
1336
        myTempLogThread->start();
 
1337
 
 
1338
#ifdef MS_COMPACTOR_POLLS
 
1339
        new_(myCompactorThread, MSCompactorThread(MS_COMPACTOR_POLL_FREQ, this));
 
1340
#else
 
1341
        new_(myCompactorThread, MSCompactorThread(MS_DEFAULT_COMPACTOR_WAIT * 1000, this));
 
1342
#endif
 
1343
 
 
1344
        
 
1345
        myCompactorThread->start();
 
1346
        exit_();
 
1347
}
 
1348
 
 
1349
 
 
1350
void MSDatabase::dropDatabase() 
 
1351
{
 
1352
        enter_();
 
1353
 
 
1354
        iDropping = true;
 
1355
        iClosing = true;
 
1356
        
 
1357
        if (iBackupThread) {
 
1358
                iBackupThread->stop();
 
1359
                iBackupThread->release();
 
1360
                iBackupThread = NULL;
 
1361
        }
 
1362
        
 
1363
        if (myTempLogThread) {
 
1364
                myTempLogThread->stop();
 
1365
                myTempLogThread->release();
 
1366
                myTempLogThread = NULL;
 
1367
        }
 
1368
        
 
1369
        if (myCompactorThread) {
 
1370
                myRepostoryList->wakeup(); // The compator thread waits on this.
 
1371
                myCompactorThread->stop();
 
1372
                myCompactorThread->release();
 
1373
                myCompactorThread = NULL;
 
1374
        }
 
1375
        
 
1376
        // Call cloud drop database even if the database is not currrently
 
1377
        // using cloud storage just in case to was in the past. If the connection
 
1378
        // to the cloud is not setup then nothing will be done.
 
1379
        try_(a) {
 
1380
                myBlobCloud->cl_dropDB();
 
1381
        }
 
1382
        catch_(a) {
 
1383
                self->logException();
 
1384
        }
 
1385
        cont_(a);
 
1386
        exit_();
 
1387
}
 
1388
 
 
1389
void MSDatabase::removeDatabasePath(CSString *doomedDatabasePath ) 
 
1390
{
 
1391
        CSPath *path = NULL;
 
1392
        CSDirectory *dir = NULL;
 
1393
        const char *file_name;
 
1394
        enter_();
 
1395
        
 
1396
        push_(doomedDatabasePath);
 
1397
        
 
1398
        // Delete repository files
 
1399
        path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-repository");
 
1400
        push_(path);
 
1401
        if (path->exists()) {
 
1402
                dir = CSDirectory::newDirectory(RETAIN(path));
 
1403
                push_(dir);
 
1404
                dir->open();
 
1405
                while (dir->next()) {
 
1406
                        file_name = dir->name();
 
1407
                        if (dir->isFile() && cs_is_extension(file_name, "bs")) {
 
1408
                                dir->deleteEntry();
 
1409
                        }
 
1410
                }
 
1411
                release_(dir);
 
1412
                if (path->isEmpty())
 
1413
                        path->removeDir();
 
1414
        }
 
1415
        release_(path);
 
1416
 
 
1417
        // Delete temp log files.
 
1418
        path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-logs");
 
1419
        push_(path);
 
1420
        if (path->exists()) {
 
1421
                dir = CSDirectory::newDirectory(RETAIN(path));
 
1422
                push_(dir);
 
1423
                dir->open();
 
1424
                while (dir->next()) {
 
1425
                        file_name = dir->name();
 
1426
                        if (dir->isFile() && (cs_is_extension(file_name, "bs") || cs_is_extension(file_name, "bst"))) {
 
1427
                                dir->deleteEntry();
 
1428
                        }
 
1429
                }
 
1430
                release_(dir);
 
1431
                if (path->isEmpty())
 
1432
                        path->removeDir();
 
1433
        }
 
1434
        release_(path);
 
1435
 
 
1436
        // Delete table reference files.
 
1437
        dir = CSDirectory::newDirectory(RETAIN(doomedDatabasePath));
 
1438
        push_(dir);
 
1439
        dir->open();
 
1440
        while (dir->next()) {
 
1441
                file_name = dir->name();
 
1442
                if (dir->isFile() && cs_is_extension(file_name, "bst"))
 
1443
                        dir->deleteEntry();
 
1444
        }
 
1445
        release_(dir);
 
1446
 
 
1447
#ifdef HAVE_ALIAS_SUPPORT
 
1448
        path = CSPath::newPath(RETAIN(doomedDatabasePath), ACTIVE_ALIAS_INDEX);
 
1449
        push_(path);
 
1450
        path->removeFile();
 
1451
        release_(path);
 
1452
#endif
 
1453
 
 
1454
        pbms_remove_ststem_tables(RETAIN(doomedDatabasePath));
 
1455
        
 
1456
        path = CSPath::newPath(RETAIN(doomedDatabasePath));
 
1457
        push_(path);
 
1458
        if (path->isEmpty() && !path->isLink()) {
 
1459
                path->removeDir();
 
1460
        } else { 
 
1461
                CSStringBuffer *new_name;
 
1462
                // If the database folder is not empty we rename it to get it out of the way.
 
1463
                // If it is not renamed it will be reused if a database with the same name is
 
1464
                // created again wich will result in the database ID being reused which may
 
1465
                // have some bad side effects.
 
1466
                new_(new_name, CSStringBuffer());
 
1467
                push_(new_name);
 
1468
                new_name->append(cs_last_name_of_path(doomedDatabasePath->getCString()));
 
1469
                new_name->append("_DROPPED");
 
1470
                path->rename(new_name->getCString());
 
1471
                release_(new_name);
 
1472
        }
 
1473
        release_(path);
 
1474
 
 
1475
        release_(doomedDatabasePath);
 
1476
        
 
1477
        path = CSPath::newPath(ms_my_get_mysql_home_path(), "pbms");
 
1478
        push_(path);
 
1479
        if (path->isEmpty() && !path->isLink()) {
 
1480
                path->removeDir();
 
1481
        }
 
1482
        release_(path);
 
1483
 
 
1484
        exit_();
 
1485
}
 
1486
 
 
1487
/* Drop the PBMS database if it exists.
 
1488
 * The root folder 'pbms' will be deleted also
 
1489
 * if it is empty and not a symbolic link.
 
1490
 * The database folder in 'pbms' is deleted if it is empty and
 
1491
 * it is not a symbolic link.
 
1492
 */
 
1493
void MSDatabase::dropDatabase(MSDatabase *doomedDatabase, const char *db_name ) 
 
1494
{
 
1495
        CSString *doomedDatabasePath = NULL;
 
1496
 
 
1497
        enter_();
 
1498
        
 
1499
        if (doomedDatabase) {
 
1500
                push_(doomedDatabase);
 
1501
                
 
1502
                // Remove any pending transactions for the dropped database.
 
1503
                // This is important because if the database is restored it will have the
 
1504
                // same database ID and the old transactions would be applied to it.
 
1505
                MSTransactionManager::dropDatabase(doomedDatabase->myDatabaseID);
 
1506
                
 
1507
                doomedDatabasePath = doomedDatabase->myDatabasePath;
 
1508
                doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
 
1509
                
 
1510
                MSTableList::removeDatabaseTables(RETAIN(doomedDatabase));
 
1511
 
 
1512
                doomedDatabase->dropDatabase(); // Shutdown database threads.
 
1513
                
 
1514
                // To avoid a deadlock a lock is not taken on the database list
 
1515
                // if shutdown is in progress. The only database that would be
 
1516
                // dropped during a shutdown is an incomplete backup database.
 
1517
                ASSERT(doomedDatabase->isBackup || !self->myMustQuit);
 
1518
                if (!self->myMustQuit) 
 
1519
                        lock_(gDatabaseList); // Be sure to shutdown the database before locking this or it can lead to deadlocks
 
1520
                        
 
1521
                gDatabaseArray->remove(doomedDatabase->myDatabaseID);
 
1522
                if (!doomedDatabase->isBackup)
 
1523
                        gDatabaseList->remove(doomedDatabase->getKey());
 
1524
                if (!self->myMustQuit) 
 
1525
                        unlock_(gDatabaseList); 
 
1526
                ASSERT(doomedDatabase->iRefCount == 1);
 
1527
                release_(doomedDatabase);
 
1528
                
 
1529
        } else {
 
1530
                CSPath *path;
 
1531
                bool create = false;
 
1532
                uint32_t db_id;
 
1533
                
 
1534
                path = createDatabasePath(ms_my_get_mysql_home_path(), CSString::newString(db_name), &db_id, &create);
 
1535
                
 
1536
                if (path) {
 
1537
                        MSTransactionManager::dropDatabase(db_id);
 
1538
 
 
1539
                        push_(path);
 
1540
                        doomedDatabasePath = path->getString();
 
1541
                        doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
 
1542
                        release_(path);
 
1543
                }
 
1544
        }
 
1545
        
 
1546
        if (doomedDatabasePath)
 
1547
                removeDatabasePath(doomedDatabasePath);
 
1548
        
 
1549
done:
 
1550
        exit_();
 
1551
}
 
1552
 
 
1553
void MSDatabase::dropDatabase(const char *db_name ) 
 
1554
{
 
1555
        enter_();
 
1556
        dropDatabase(getDatabase(db_name, false), db_name);
 
1557
        exit_();
 
1558
}
 
1559
 
 
1560
// The table_path can be several things here:
 
1561
// 1: <absalute path>/<database>/<table>
 
1562
// 2: <absalute path>/<database>
 
1563
// 3: <database>/<table>
 
1564
bool MSDatabase::convertTablePathToIDs(const char *table_path, uint32_t *db_id, uint32_t *tab_id, bool create) 
 
1565
{
 
1566
        const char      *base = ms_my_get_mysql_home_path();
 
1567
        CSString        *table_url;
 
1568
        CSString        *db_path = NULL;
 
1569
        CSString        *db_name = NULL;
 
1570
        CSString        *tab_name = NULL;
 
1571
        MSDatabase      *db;
 
1572
        enter_();
 
1573
        
 
1574
        *db_id = 0;
 
1575
        *tab_id = 0;
 
1576
        
 
1577
        table_url = CSString::newString(table_path);
 
1578
        if (table_url->startsWith(base)) {
 
1579
                table_url = table_url->right(base);
 
1580
        }
 
1581
        push_(table_url);
 
1582
 
 
1583
 
 
1584
        db_path = table_url->left("/", -1);
 
1585
        push_(db_path);
 
1586
        tab_name = table_url->right("/", -1);
 
1587
        
 
1588
        pop_(db_path);
 
1589
        release_(table_url);
 
1590
 
 
1591
        if (db_path->length() == 0) { // Only a database name was supplied.
 
1592
                db_path->release();
 
1593
                db_name = tab_name;
 
1594
                tab_name = NULL;
 
1595
        } else {
 
1596
                if (tab_name->length() == 0) {
 
1597
                        tab_name->release();
 
1598
                        tab_name = NULL;
 
1599
                } else
 
1600
                        push_(tab_name);
 
1601
                push_(db_path);
 
1602
                db_name = db_path->right("/", -1);
 
1603
                pop_(db_path);
 
1604
                if (db_name->length() == 0) {
 
1605
                        db_name->release();
 
1606
                        db_name = db_path;
 
1607
                } else {
 
1608
                        db_path->release();
 
1609
                        db_path = NULL;
 
1610
                }
 
1611
        }
 
1612
        
 
1613
        db = MSDatabase::getDatabase(db_name, create); // This will release db_name
 
1614
        if (db) {
 
1615
                *db_id = db->myDatabaseID;
 
1616
                if (tab_name) {
 
1617
                        MSTable         *tab;
 
1618
                        pop_(tab_name);
 
1619
                        push_(db);
 
1620
                        tab = db->getTable(tab_name, create);// This will release tab_name
 
1621
                        pop_(db);
 
1622
                        if (tab) {
 
1623
                                *tab_id = tab->myTableID;
 
1624
                                tab->release();
 
1625
                        }
 
1626
                }
 
1627
                        
 
1628
                db->release();
 
1629
        }
 
1630
        
 
1631
        return_((*tab_id > 0) && (*db_id > 0));
 
1632
}
 
1633
 
 
1634
bool MSDatabase::convertTableAndDatabaseToIDs(const char *db_name, const char *tab_name, uint32_t *db_id, uint32_t *tab_id, bool create) 
 
1635
{
 
1636
        MSDatabase      *db;
 
1637
        enter_();
 
1638
        
 
1639
        *db_id = 0;
 
1640
        *tab_id = 0;
 
1641
        
 
1642
        db = MSDatabase::getDatabase(db_name, create); 
 
1643
        if (db) {
 
1644
                push_(db);
 
1645
                *db_id = db->myDatabaseID;
 
1646
                if (tab_name) {
 
1647
                        MSTable         *tab;
 
1648
                        tab = db->getTable(tab_name, create);
 
1649
                        if (tab) {
 
1650
                                *tab_id = tab->myTableID;
 
1651
                                tab->release();
 
1652
                        }
 
1653
                }
 
1654
                        
 
1655
                release_(db);
 
1656
        }
 
1657
        
 
1658
        return_((*tab_id > 0) && (*db_id > 0));
 
1659
}
 
1660
 
 
1661
MSDatabase *MSDatabase::loadDatabase(CSString *db_name, bool create)
 
1662
{
 
1663
        MSDatabase *db;
 
1664
        enter_();
 
1665
        
 
1666
        db = newDatabase(ms_my_get_mysql_home_path(), db_name, 0, create);
 
1667
        
 
1668
        if (db) {
 
1669
                push_(db);
 
1670
                
 
1671
                gDatabaseList->add(RETAIN(db));
 
1672
                
 
1673
                gDatabaseArray->set(db->myDatabaseID, RETAIN(db));
 
1674
                db->startThreads();
 
1675
                pbms_load_system_tables(RETAIN(db));
 
1676
                        
 
1677
                pop_(db);
 
1678
        }
 
1679
        return_(db);
 
1680
}
 
1681
 
 
1682
uint32_t MSDatabase::getDatabaseID(CSString *db_name, bool create)
 
1683
{
 
1684
        MSDatabase *db;
 
1685
        uint32_t id = 0;
 
1686
        enter_();
 
1687
        push_(db_name);
 
1688
        
 
1689
        
 
1690
        lock_(gDatabaseList);
 
1691
        if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
 
1692
                db = MSDatabase::loadDatabase(RETAIN(db_name), create);
 
1693
                if (!db)
 
1694
                        goto exit;
 
1695
                id = db->myDatabaseID;
 
1696
                db->release();
 
1697
        } else
 
1698
                id = db->myDatabaseID;
 
1699
        
 
1700
        exit:
 
1701
        unlock_(gDatabaseList);
 
1702
        release_(db_name);
 
1703
        return_(id);
 
1704
}
 
1705
 
 
1706
 
 
1707
uint32_t MSDatabase::getDatabaseID(const char *db_name, bool create)
 
1708
{
 
1709
        return getDatabaseID(CSString::newString(db_name), create);
 
1710
}
 
1711
 
 
1712
MSDatabase *MSDatabase::getBackupDatabase(CSString *db_location, CSString *db_name, uint32_t db_id, bool create)
 
1713
{
 
1714
        bool was_created = create; 
 
1715
        CSPath *path;
 
1716
        MSDatabase *db;
 
1717
        enter_();
 
1718
        
 
1719
        push_(db_location);
 
1720
        push_(db_name);
 
1721
        // If the db already exists and 'create' == true then the existing db
 
1722
        // must be deleted.
 
1723
        // Create the database path, if 'create' == false then it can return NULL
 
1724
        path = createDatabasePath(db_location->getCString(), RETAIN(db_name), &db_id, &was_created);
 
1725
        if (!path) {
 
1726
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
 
1727
        }
 
1728
        push_(path);
 
1729
        
 
1730
        // If we wanted to create it but it already exists then throw an error.
 
1731
        if ( create && !was_created) {
 
1732
                char str[120];
 
1733
                snprintf(str, 120, "Duplicate database: %s", db_name->getCString());
 
1734
                CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, str);
 
1735
        }
 
1736
                
 
1737
        release_(path);
 
1738
        pop_(db_name);
 
1739
        // everything looks OK
 
1740
        db = newDatabase(db_location->getCString(), db_name, db_id, create);
 
1741
        db->setBackupDatabase();
 
1742
        release_(db_location);
 
1743
        return_(db);
 
1744
}
 
1745