~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/database_ms.cc

This patch completes the first step in the splitting of
the XA resource manager API from the storage engine API,
as outlined in the specification here:

http://drizzle.org/wiki/XaStorageEngine

* Splits plugin::StorageEngine into a base StorageEngine
  class and two derived classes, TransactionalStorageEngine
  and XaStorageEngine.  XaStorageEngine derives from
  TransactionalStorageEngine and creates the XA Resource
  Manager API for storage engines.

  - The methods moved from StorageEngine to TransactionalStorageEngine
    include releaseTemporaryLatches(), startConsistentSnapshot(), 
    commit(), rollback(), setSavepoint(), releaseSavepoint(),
    rollbackToSavepoint() and hasTwoPhaseCommit()
  - The methods moved from StorageEngine to XaStorageEngine
    include recover(), commitXid(), rollbackXid(), and prepare()

* Places all static "EngineVector"s into their proper
  namespaces (typedefs belong in header files, not implementation files)
  and places all static methods corresponding
  to either only transactional engines or only XA engines
  into their respective files in /drizzled/plugin/

* Modifies the InnoDB "handler" files to extend plugin::XaStorageEngine
  and not plugin::StorageEngine

The next step, as outlined in the wiki spec page above, is to isolate
the XA Resource Manager API into its own plugin class and modify
plugin::XaStorageEngine to implement plugin::XaResourceManager via
composition.  This is necessary to enable building plugins which can
participate in an XA transaction *without having to have that plugin
implement the entire storage engine API*

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Original author: Paul McCullagh
20
 
 * Continued development: Barry Leslie
21
 
 *
22
 
 * 2007-05-25
23
 
 *
24
 
 * H&G2JCtL
25
 
 *
26
 
 * Network interface.
27
 
 *
28
 
 */
29
 
 
30
 
#ifdef DRIZZLED
31
 
#include <config.h>
32
 
#include <drizzled/common.h>
33
 
#include <drizzled/session.h>
34
 
#include <drizzled/table.h>
35
 
#include <drizzled/message/table.pb.h>
36
 
#include <drizzled/charset_info.h>
37
 
#include <drizzled/table_proto.h>
38
 
#include <drizzled/field.h>
39
 
#endif
40
 
 
41
 
#include "cslib/CSConfig.h"
42
 
 
43
 
#include <string.h>
44
 
#include <ctype.h>
45
 
#include <inttypes.h>
46
 
 
47
 
#include "string.h"
48
 
 
49
 
#include "cslib/CSGlobal.h"
50
 
#include "cslib/CSLog.h"
51
 
#include "cslib/CSDirectory.h"
52
 
#include "cslib/CSStrUtil.h"
53
 
 
54
 
#include "database_ms.h"
55
 
#include "open_table_ms.h"
56
 
#include "backup_ms.h"
57
 
#include "table_ms.h"
58
 
#include "temp_log_ms.h"
59
 
#include "network_ms.h"
60
 
#include "mysql_ms.h"
61
 
#include "pbmslib.h"
62
 
#include "transaction_ms.h"
63
 
//#include "systab_variable_ms.h"
64
 
#include "systab_httpheader_ms.h"
65
 
#include "parameters_ms.h"
66
 
#include "pbmsdaemon_ms.h"
67
 
 
68
 
 
69
 
 
70
 
CSSyncSortedList        *MSDatabase::gDatabaseList;
71
 
CSSparseArray           *MSDatabase::gDatabaseArray;
72
 
/*
73
 
 * -------------------------------------------------------------------------
74
 
 * PBMS DATABASES
75
 
 */
76
 
 
77
 
MSDatabase::MSDatabase():
78
 
myIsPBMS(false),
79
 
myDatabaseID(0),
80
 
myDatabaseName(NULL),
81
 
myDatabasePath(NULL),
82
 
myTempLogArray(NULL),
83
 
myCompactorThread(NULL),
84
 
myTempLogThread(NULL),
85
 
myRepostoryList(NULL),
86
 
myBlobCloud(NULL),
87
 
myBlobType(MS_STANDARD_STORAGE),
88
 
isBackup(false),
89
 
iBackupThread(NULL),
90
 
iBackupTime(0),
91
 
iRecovering(false),
92
 
#ifdef HAVE_ALIAS_SUPPORT
93
 
iBlobAliases(NULL),
94
 
#endif
95
 
iClosing(false),
96
 
iTableList(NULL),
97
 
iTableArray(NULL),
98
 
iMaxTableID(0),
99
 
iWriteTempLog(NULL),
100
 
iDropping(false),
101
 
iNextBlobRefId(0)
102
 
{
103
 
//startTracking();
104
 
}
105
 
 
106
 
MSDatabase::~MSDatabase()
107
 
{
108
 
        iClosing = true;
109
 
        if (iBackupThread) {
110
 
                iBackupThread->stop();
111
 
                iBackupThread->release();
112
 
                iBackupThread = NULL;
113
 
        }
114
 
        
115
 
        if (myTempLogThread) {
116
 
                myTempLogThread->stop();
117
 
                myTempLogThread->release();
118
 
                myTempLogThread = NULL;
119
 
        }
120
 
        if (myCompactorThread) {
121
 
                myRepostoryList->wakeup(); // The compator thread waits on this.
122
 
                myCompactorThread->stop();
123
 
                myCompactorThread->release();
124
 
                myCompactorThread = NULL;
125
 
        }
126
 
        
127
 
        if (myDatabasePath)
128
 
                myDatabasePath->release();
129
 
                
130
 
        if (myDatabaseName)
131
 
                myDatabaseName->release();
132
 
                
133
 
        iWriteTempLog = NULL;
134
 
        if (myTempLogArray) {
135
 
                myTempLogArray->clear();
136
 
                myTempLogArray->release();
137
 
        }
138
 
        if (iTableList) {
139
 
                iTableList->clear();
140
 
                iTableList->release();
141
 
        }
142
 
        if (iTableArray){
143
 
                iTableArray->clear();
144
 
                iTableArray->release();
145
 
        }
146
 
        if (myRepostoryList) {
147
 
                myRepostoryList->clear();
148
 
                myRepostoryList->release();
149
 
        }
150
 
#ifdef HAVE_ALIAS_SUPPORT
151
 
        if (iBlobAliases) {
152
 
                iBlobAliases->ma_close();
153
 
                iBlobAliases->release();
154
 
        }
155
 
#endif
156
 
        if (myBlobCloud) {
157
 
                myBlobCloud->release();
158
 
        }
159
 
}
160
 
 
161
 
uint32_t MSDatabase::fileToTableId(const char *file_name, const char *name_part)
162
 
{
163
 
        uint32_t value = 0;
164
 
 
165
 
        if (file_name) {
166
 
                const char *num = file_name +  strlen(file_name) - 1;
167
 
                
168
 
                while (num >= file_name && *num != '-')
169
 
                        num--;
170
 
                if (name_part) {
171
 
                        /* Check the name part of the file: */
172
 
                        int len = strlen(name_part);
173
 
                        
174
 
                        if (len != num - file_name)
175
 
                                return 0;
176
 
                        if (strncmp(file_name, name_part, len) != 0)
177
 
                                return 0;
178
 
                }
179
 
                num++;
180
 
                if (isdigit(*num))
181
 
                        sscanf(num, "%"PRIu32"", &value);
182
 
        }
183
 
        return value;
184
 
}
185
 
 
186
 
const char *MSDatabase::fileToTableName(size_t size, char *tab_name, const char *file_name)
187
 
{
188
 
        const char      *cptr;
189
 
        size_t          len;
190
 
 
191
 
        file_name = cs_last_name_of_path(file_name);
192
 
        cptr = file_name + strlen(file_name) - 1;
193
 
        while (cptr > file_name && *cptr != '.')
194
 
                cptr--;
195
 
        if (cptr > file_name && *cptr == '.') {
196
 
                if (strncmp(cptr, ".bs", 2) == 0) {
197
 
                        cptr--;
198
 
                        while (cptr > file_name && isdigit(*cptr))
199
 
                                cptr--;
200
 
                }
201
 
        }
202
 
 
203
 
        len = cptr - file_name;
204
 
        if (len > size-1)
205
 
                len = size-1;
206
 
 
207
 
        memcpy(tab_name, file_name, len);
208
 
        tab_name[len] = 0;
209
 
 
210
 
        /* Return a pointer to what was removed! */
211
 
        return file_name + len;
212
 
}
213
 
 
214
 
 
215
 
const char *MSDatabase::getDatabaseNameCString()
216
 
{
217
 
        return myDatabaseName->getCString();
218
 
}
219
 
 
220
 
MSTable *MSDatabase::getTable(CSString *tab_name, bool create)
221
 
{
222
 
        MSTable *tab;
223
 
        
224
 
        enter_();
225
 
        push_(tab_name);
226
 
        lock_(iTableList);
227
 
        if (!(tab = (MSTable *) iTableList->find(tab_name))) {
228
 
 
229
 
                if (create) {
230
 
                        /* Create a new table: */
231
 
                        tab = MSTable::newTable(iMaxTableID+1, RETAIN(tab_name), this, (off64_t) 0, false);
232
 
                        iTableList->add(tab);
233
 
                        iTableArray->set(iMaxTableID+1, RETAIN(tab));
234
 
                        iMaxTableID++;
235
 
                }
236
 
        }
237
 
        if (tab)
238
 
                tab->retain();
239
 
        unlock_(iTableList);
240
 
        release_(tab_name);
241
 
        return_(tab);
242
 
}
243
 
 
244
 
MSTable *MSDatabase::getTable(const char *tab_name, bool create)
245
 
{
246
 
        return getTable(CSString::newString(tab_name), create);
247
 
}
248
 
 
249
 
 
250
 
MSTable *MSDatabase::getTable(uint32_t tab_id, bool missing_ok)
251
 
{
252
 
        MSTable *tab;
253
 
        
254
 
        enter_();
255
 
        lock_(iTableList);
256
 
        if (!(tab = (MSTable *) iTableArray->get((uint32_t) tab_id))) {
257
 
                if (missing_ok) {
258
 
                        unlock_(iTableList);
259
 
                        return_(NULL);
260
 
                }
261
 
                char buffer[CS_EXC_MESSAGE_SIZE];
262
 
 
263
 
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown table #");
264
 
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) tab_id);
265
 
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, " in database ");
266
 
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, getDatabaseNameCString());
267
 
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_TABLE, buffer);
268
 
        }
269
 
        tab->retain();
270
 
        unlock_(iTableList);
271
 
        return_(tab);
272
 
}
273
 
 
274
 
MSTable *MSDatabase::getNextTable(uint32_t *pos)
275
 
{
276
 
        uint32_t i = *pos;
277
 
        MSTable *tab = NULL;
278
 
        
279
 
        enter_();
280
 
        lock_(iTableList);
281
 
        while (i < iTableList->getSize()) {
282
 
                tab = (MSTable *) iTableList->itemAt(i++);
283
 
                if (!tab->isToDelete())
284
 
                        break;
285
 
                tab = NULL;
286
 
        }
287
 
        if (tab)
288
 
                tab->retain();
289
 
        unlock_(iTableList);
290
 
        *pos = i;
291
 
        return_(tab);
292
 
}
293
 
 
294
 
void MSDatabase::addTable(uint32_t tab_id, const char *tab_name, off64_t file_size, bool to_delete)
295
 
{
296
 
        MSTable *tab;
297
 
 
298
 
        if (tab_id > iMaxTableID)
299
 
                iMaxTableID = tab_id;
300
 
        tab = MSTable::newTable(tab_id, tab_name, this, file_size, to_delete);
301
 
        iTableList->add(tab);
302
 
        iTableArray->set(tab_id, RETAIN(tab));
303
 
}
304
 
 
305
 
void MSDatabase::addTableFromFile(CSDirectory *dir, const char *file_name, bool to_delete)
306
 
{
307
 
        off64_t file_size;
308
 
        uint32_t        file_id;
309
 
        char    tab_name[MS_TABLE_NAME_SIZE];
310
 
 
311
 
        dir->info(NULL, &file_size, NULL);
312
 
        file_id = fileToTableId(file_name);
313
 
        fileToTableName(MS_TABLE_NAME_SIZE, tab_name, file_name);
314
 
        addTable(file_id, tab_name, file_size, to_delete);
315
 
}
316
 
 
317
 
void MSDatabase::removeTable(MSTable *tab)
318
 
{
319
 
        enter_();
320
 
        push_(tab);
321
 
        lock_(iTableList);
322
 
        iTableList->remove(tab->myTableName);
323
 
        iTableArray->remove(tab->myTableID);
324
 
        unlock_(iTableList);
325
 
        release_(tab);
326
 
        exit_();
327
 
}
328
 
 
329
 
void MSDatabase::dropTable(MSTable *tab)
330
 
{
331
 
        enter_();
332
 
        push_(tab);
333
 
        lock_(iTableList);
334
 
        iTableList->remove(tab->myTableName);
335
 
        iTableArray->remove(tab->myTableID);
336
 
 
337
 
        // Cute: you drop the table by adding it with the 'to_delete' flag set to 'true'
338
 
        addTable(tab->myTableID, tab->myTableName->getCString(), tab->getTableFileSize(), true);
339
 
 
340
 
        unlock_(iTableList);
341
 
        release_(tab);
342
 
        exit_();
343
 
}
344
 
 
345
 
// This function is used when dropping tables from a database before
346
 
// dropping the database itself. 
347
 
CSString *MSDatabase::getATableName()
348
 
{
349
 
        uint32_t i = 0;
350
 
        MSTable *tab;
351
 
        CSString *name = NULL;
352
 
        
353
 
        enter_();
354
 
        lock_(iTableList);
355
 
 
356
 
        while ((tab = (MSTable *) iTableList->itemAt(i++)) && tab->isToDelete()) ;
357
 
        if (tab) {
358
 
                name = tab->getTableName();
359
 
                name->retain();
360
 
        }
361
 
        unlock_(iTableList);
362
 
        return_(name);
363
 
}
364
 
 
365
 
uint32_t MSDatabase::getTableCount()
366
 
{
367
 
        uint32_t cnt = 0, i = 0;
368
 
        MSTable *tab;
369
 
        
370
 
        enter_();
371
 
        lock_(iTableList);
372
 
 
373
 
        while ((tab = (MSTable *) iTableList->itemAt(i++))) {
374
 
                if (!tab->isToDelete())
375
 
                        cnt++;
376
 
        }
377
 
 
378
 
        unlock_(iTableList);
379
 
        return_(cnt);
380
 
}
381
 
 
382
 
 
383
 
void MSDatabase::renameTable(MSTable *tab, const char *to_name)
384
 
{
385
 
        enter_();
386
 
        lock_(iTableList);
387
 
        iTableList->remove(tab->myTableName);
388
 
        iTableArray->remove(tab->myTableID);
389
 
 
390
 
        addTable(tab->myTableID, to_name, tab->getTableFileSize(), false);
391
 
 
392
 
        unlock_(iTableList);
393
 
        exit_();
394
 
}
395
 
 
396
 
void MSDatabase::openWriteRepo(MSOpenTable *otab)
397
 
{
398
 
        if (otab->myWriteRepo && otab->myWriteRepoFile)
399
 
                return;
400
 
 
401
 
        enter_();
402
 
        if (!otab->myWriteRepo)
403
 
                otab->myWriteRepo = lockRepo(0);
404
 
 
405
 
        /* Now open the repo file for the open table: */
406
 
        otab->myWriteRepo->openRepoFileForWriting(otab);
407
 
        exit_();
408
 
}
409
 
 
410
 
MSRepository *MSDatabase::getRepoFullOfTrash(time_t *ret_wait_time)
411
 
{
412
 
        MSRepository    *repo = NULL;
413
 
        time_t                  wait_time = 0;
414
 
        
415
 
        if (ret_wait_time)
416
 
                wait_time = *ret_wait_time;
417
 
        enter_();
418
 
        lock_(myRepostoryList);
419
 
        for (uint32_t i=0; i<myRepostoryList->size(); i++) {
420
 
                retry:
421
 
                if ((repo = (MSRepository *) myRepostoryList->get(i))) {
422
 
                        if (!repo->isRemovingFP && !repo->mustBeDeleted && !repo->isRepoLocked()) {
423
 
                                if (!repo->myRepoHeadSize) {
424
 
                                        /* The file has not yet been opened, so the
425
 
                                         * garbage count will not be known!
426
 
                                         */
427
 
                                        MSRepoFile *repo_file;
428
 
 
429
 
                                        repo->retain();
430
 
                                        unlock_(myRepostoryList);
431
 
                                        push_(repo);
432
 
                                        repo_file = repo->openRepoFile();
433
 
                                        repo_file->release();
434
 
                                        release_(repo);
435
 
                                        lock_(myRepostoryList);
436
 
                                        goto retry;
437
 
                                }
438
 
                                if (repo->getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
439
 
                                        /* Make sure there are not temp BLOBs in this repository that have
440
 
                                         * not yet timed out:
441
 
                                         */
442
 
                                        time_t now = time(NULL);
443
 
                                        time_t then = repo->myLastTempTime;
444
 
 
445
 
                                        /* Check if there are any temp BLOBs to be removed: */
446
 
                                        if (now > (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
447
 
                                                repo->lockRepo(REPO_COMPACTING); 
448
 
                                                repo->retain();
449
 
                                                break;
450
 
                                        }
451
 
                                        else {
452
 
                                                /* There are temp BLOBs to wait for... */
453
 
                                                if (!wait_time || wait_time > MSTempLog::adjustWaitTime(then, now))
454
 
                                                        wait_time = MSTempLog::adjustWaitTime(then, now);
455
 
                                        }
456
 
                                }
457
 
                        }
458
 
                        repo = NULL;
459
 
                }
460
 
        }
461
 
        unlock_(myRepostoryList);
462
 
        if (ret_wait_time)
463
 
                *ret_wait_time = wait_time;
464
 
        return_(repo);
465
 
}
466
 
 
467
 
MSRepository *MSDatabase::lockRepo(off64_t size)
468
 
{
469
 
        MSRepository    *repo;
470
 
        uint32_t                        free_slot;
471
 
 
472
 
        enter_();
473
 
        lock_(myRepostoryList);
474
 
        free_slot = myRepostoryList->size();
475
 
        /* Find an unlocked repository file that is below the write threshold: */
476
 
        for (uint32_t i=0; i<myRepostoryList->size(); i++) {
477
 
                if ((repo = (MSRepository *) myRepostoryList->get(i))) {
478
 
                        if ((!repo->isRepoLocked()) && (!repo->isRemovingFP) && (!repo->mustBeDeleted) &&
479
 
                                ((repo->myRepoFileSize + size) < PBMSParameters::getRepoThreshold()) 
480
 
                                /**/ && (repo->getGarbageLevel() < PBMSParameters::getGarbageThreshold()))
481
 
                                goto found1;
482
 
                }
483
 
                else {
484
 
                        if (i < free_slot)
485
 
                                free_slot = i;
486
 
                }
487
 
        }
488
 
 
489
 
        /* None found, create a new repo file: */
490
 
        new_(repo, MSRepository(free_slot + 1, this, 0));
491
 
        myRepostoryList->set(free_slot, repo);
492
 
 
493
 
        found1:
494
 
        repo->retain();
495
 
        repo->lockRepo(REPO_WRITE);  // <- The MSRepository::backToPool() will unlock this.
496
 
        unlock_(myRepostoryList);
497
 
        return_(repo);
498
 
}
499
 
 
500
 
MSRepoFile *MSDatabase::getRepoFileFromPool(uint32_t repo_id, bool missing_ok)
501
 
{
502
 
        MSRepository    *repo;
503
 
        MSRepoFile              *file;
504
 
 
505
 
        enter_();
506
 
        lock_(myRepostoryList);
507
 
        if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1))) {
508
 
                if (!missing_ok) {
509
 
                        char buffer[CS_EXC_MESSAGE_SIZE];
510
 
 
511
 
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown repository file: ");
512
 
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
513
 
                        CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, buffer);
514
 
                }
515
 
                unlock_(myRepostoryList);
516
 
                return_(NULL);
517
 
        }
518
 
        if (repo->isRemovingFP) {
519
 
                char buffer[CS_EXC_MESSAGE_SIZE];
520
 
 
521
 
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Repository will be removed: ");
522
 
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
523
 
                CSException::throwException(CS_CONTEXT, MS_ERR_REMOVING_REPO, buffer);
524
 
        }
525
 
        repo->retain(); /* Release is here: [++] */
526
 
        file = repo->getRepoFile();
527
 
        unlock_(myRepostoryList);
528
 
 
529
 
        if (!file) {
530
 
                file = repo->openRepoFile();
531
 
                lock_(myRepostoryList);
532
 
                repo->addRepoFile(file);
533
 
                file->retain();
534
 
                unlock_(myRepostoryList);
535
 
        }
536
 
        return_(file);
537
 
}
538
 
 
539
 
void MSDatabase::returnRepoFileToPool(MSRepoFile *file)
540
 
{
541
 
        MSRepository    *repo;
542
 
 
543
 
        enter_();
544
 
        lock_(myRepostoryList);
545
 
        push_(file);
546
 
        if ((repo = file->myRepo)) {
547
 
                if (repo->isRemovingFP) {
548
 
                        repo->removeRepoFile(RETAIN(file));
549
 
                        myRepostoryList->wakeup();
550
 
                }
551
 
                else
552
 
                        repo->returnRepoFile(RETAIN(file));
553
 
                repo->release(); /* [++] here is the release.  */
554
 
        }
555
 
        release_(file);
556
 
        unlock_(myRepostoryList);
557
 
        exit_();
558
 
}
559
 
 
560
 
void MSDatabase::removeRepo(uint32_t repo_id, bool *mustQuit)
561
 
{
562
 
        MSRepository *repo;
563
 
 
564
 
        enter_();
565
 
        lock_(myRepostoryList);
566
 
        while ((!mustQuit || !*mustQuit) && !iClosing) {
567
 
                if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1)))
568
 
                        break;
569
 
                repo->isRemovingFP = true;
570
 
                if (repo->removeRepoFilesNotInUse()) {
571
 
                        myRepostoryList->set(repo_id - 1, NULL);
572
 
                        break;
573
 
                }
574
 
                /*
575
 
                 * Wait for the files that are in use to be
576
 
                 * freed.
577
 
                 */
578
 
                myRepostoryList->wait();
579
 
        }
580
 
        unlock_(myRepostoryList);
581
 
        exit_();
582
 
}
583
 
 
584
 
void MSDatabase::queueTempLogEvent(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
585
 
         uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time)
586
 
{
587
 
        MSTempLogItemRec        item;
588
 
        uint32_t                                timev;
589
 
 
590
 
        // Each otab object holds an handle to an instance of an OPEN
591
 
        // temp log. This is so that each thread has it's own open temp log 
592
 
        // and doesn't need to be opened and close it constantly.
593
 
        
594
 
        enter_();
595
 
        lock_(myTempLogArray);
596
 
        if (!iWriteTempLog) {
597
 
                iWriteTempLog = (MSTempLog *) myTempLogArray->last();
598
 
                if (!iWriteTempLog) {
599
 
                        new_(iWriteTempLog, MSTempLog(1, this, 0));
600
 
                        myTempLogArray->set(1, iWriteTempLog);
601
 
                }
602
 
        }
603
 
        if (!otab->myTempLogFile)
604
 
                otab->myTempLogFile = iWriteTempLog->openTempLog();
605
 
        else if (otab->myTempLogFile->myTempLogID != iWriteTempLog->myLogID) {
606
 
                otab->myTempLogFile->release();
607
 
                otab->myTempLogFile = NULL;
608
 
                otab->myTempLogFile = iWriteTempLog->openTempLog();
609
 
        }
610
 
 
611
 
        if (iWriteTempLog->myTempLogSize >= PBMSParameters::getTempLogThreshold()) {
612
 
                uint32_t tmp_log_id = iWriteTempLog->myLogID + 1;
613
 
 
614
 
                new_(iWriteTempLog, MSTempLog(tmp_log_id, this, 0));
615
 
                myTempLogArray->set(tmp_log_id, iWriteTempLog);
616
 
 
617
 
                otab->myTempLogFile->release();
618
 
                otab->myTempLogFile = NULL;
619
 
                otab->myTempLogFile = iWriteTempLog->openTempLog();
620
 
 
621
 
        }
622
 
 
623
 
        timev = time(NULL);
624
 
        *log_id = iWriteTempLog->myLogID;
625
 
        *log_offset = (uint32_t) iWriteTempLog->myTempLogSize;
626
 
        if (q_time)
627
 
                *q_time = timev;
628
 
        iWriteTempLog->myTempLogSize += iWriteTempLog->myTemplogRecSize;
629
 
        unlock_(myTempLogArray);
630
 
 
631
 
        CS_SET_DISK_1(item.ti_type_1, type);
632
 
        CS_SET_DISK_4(item.ti_table_id_4, tab_id);
633
 
        CS_SET_DISK_6(item.ti_blob_id_6, blob_id);
634
 
        CS_SET_DISK_4(item.ti_auth_code_4, auth_code);
635
 
        CS_SET_DISK_4(item.ti_time_4, timev);
636
 
        otab->myTempLogFile->write(&item, *log_offset, sizeof(MSTempLogItemRec));
637
 
        
638
 
        
639
 
        exit_();
640
 
}
641
 
 
642
 
#ifdef HAVE_ALIAS_SUPPORT
643
 
void MSDatabase::queueForDeletion(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
644
 
         uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time, MSDiskAliasPtr aliasDiskRec)
645
 
{
646
 
        enter_();
647
 
        
648
 
        queueTempLogEvent(otab, type, tab_id, blob_id, auth_code, log_id, log_offset, q_time);
649
 
                
650
 
        // If it has an alias remove it from the ailias index.
651
 
        if (aliasDiskRec) {
652
 
                try_(a) {
653
 
                        deleteBlobAlias(aliasDiskRec);
654
 
                }
655
 
                catch_(a);
656
 
                self->logException();
657
 
                cont_(a);
658
 
        }
659
 
        
660
 
        exit_();
661
 
}
662
 
#endif
663
 
 
664
 
MSTempLogFile *MSDatabase::openTempLogFile(uint32_t log_id, size_t *log_rec_size, size_t *log_head_size)
665
 
{
666
 
        MSTempLog               *log;
667
 
        MSTempLogFile   *log_file = NULL;
668
 
 
669
 
        enter_();
670
 
        lock_(myTempLogArray);
671
 
        if (log_id)
672
 
                log = (MSTempLog *) myTempLogArray->get(log_id);
673
 
        else
674
 
                log = (MSTempLog *) myTempLogArray->first();
675
 
        if (log) {
676
 
                log_file = log->openTempLog();
677
 
                if (log_rec_size)
678
 
                        *log_rec_size = log->myTemplogRecSize;
679
 
                if (log_head_size)
680
 
                        *log_head_size = log->myTempLogHeadSize;
681
 
        }
682
 
        unlock_(myTempLogArray);
683
 
        return_(log_file);
684
 
}
685
 
 
686
 
uint32_t MSDatabase::getTempLogCount()
687
 
{
688
 
        uint32_t count;
689
 
 
690
 
        enter_();
691
 
        lock_(myTempLogArray);
692
 
        count = myTempLogArray->size();
693
 
        unlock_(myTempLogArray);
694
 
        return_(count);
695
 
}
696
 
 
697
 
void MSDatabase::removeTempLog(uint32_t log_id)
698
 
{
699
 
        enter_();
700
 
        lock_(myTempLogArray);
701
 
        myTempLogArray->remove(log_id);
702
 
        unlock_(myTempLogArray);
703
 
        exit_();
704
 
}
705
 
 
706
 
CSObject *MSDatabase::getKey()
707
 
{
708
 
        return (CSObject *) myDatabaseName;
709
 
}
710
 
 
711
 
int MSDatabase::compareKey(CSObject *key)
712
 
{
713
 
        return myDatabaseName->compare((CSString *) key);
714
 
}
715
 
 
716
 
MSCompactorThread *MSDatabase::getCompactorThread()
717
 
{
718
 
        return myCompactorThread;
719
 
}
720
 
 
721
 
CSSyncVector *MSDatabase::getRepositoryList()
722
 
{       
723
 
        return myRepostoryList;
724
 
}
725
 
 
726
 
#ifdef HAVE_ALIAS_SUPPORT
727
 
uint32_t MSDatabase::registerBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
728
 
{
729
 
        uint32_t hash;
730
 
        bool can_retry = true;
731
 
        enter_();
732
 
        
733
 
retry:
734
 
        lock_(&iBlobAliaseLock);
735
 
        
736
 
        try_(a) {
737
 
                hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
738
 
        }
739
 
        
740
 
        catch_(a) {
741
 
                unlock_(&iBlobAliaseLock);
742
 
                if (can_retry) {
743
 
                        // It can be that a duplicater alias exists that was deleted
744
 
                        // but the transaction has not been written to the repository yet.
745
 
                        // Flush all committed transactions to the repository file.
746
 
                        MSTransactionManager::flush();
747
 
                        can_retry = false;
748
 
                        goto retry;
749
 
                }
750
 
                throw_();
751
 
        }
752
 
        
753
 
        cont_(a);
754
 
        unlock_(&iBlobAliaseLock);
755
 
        return_(hash);
756
 
}
757
 
 
758
 
uint32_t MSDatabase::updateBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t old_alias_hash, const char *alias)
759
 
{
760
 
        uint32_t new_hash;
761
 
        enter_();
762
 
        lock_(&iBlobAliaseLock);
763
 
        
764
 
        new_hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
765
 
        iBlobAliases->deleteAlias(repo_id, repo_offset, old_alias_hash);
766
 
 
767
 
        unlock_(&iBlobAliaseLock);
768
 
        return_(new_hash);
769
 
}
770
 
 
771
 
void MSDatabase::deleteBlobAlias(MSDiskAliasPtr diskRec)
772
 
{
773
 
        enter_();
774
 
        lock_(&iBlobAliaseLock);
775
 
        iBlobAliases->deleteAlias(diskRec);
776
 
        unlock_(&iBlobAliaseLock);
777
 
        exit_();
778
 
}
779
 
 
780
 
void MSDatabase::deleteBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
781
 
{
782
 
        MSDiskAliasRec diskRec;
783
 
        
784
 
        CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);   
785
 
        CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);        
786
 
        CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);
787
 
        deleteBlobAlias(&diskRec);
788
 
}
789
 
 
790
 
void MSDatabase::moveBlobAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
791
 
{
792
 
        enter_();
793
 
        lock_(&iBlobAliaseLock);
794
 
        iBlobAliases->resetAlias(old_repo_id, old_repo_offset, alias_hash, new_repo_id, new_repo_offset);
795
 
        unlock_(&iBlobAliaseLock);
796
 
        exit_();
797
 
}
798
 
#endif
799
 
 
800
 
bool MSDatabase::isValidHeaderField(const char *name)
801
 
{
802
 
        bool is_valid = false;
803
 
        CSString                *header;
804
 
        enter_();
805
 
 
806
 
        if (name && *name) {
807
 
                if (strcasecmp(name, MS_ALIAS_TAG)) {
808
 
                        lock_(&iHTTPMetaDataHeaders);
809
 
                        header = CSString::newString(name);
810
 
                        push_(header);
811
 
                                
812
 
                        is_valid = (iHTTPMetaDataHeaders.find(header) != NULL);
813
 
                        release_(header);
814
 
                        
815
 
                        unlock_(&iHTTPMetaDataHeaders);
816
 
                } else 
817
 
                        is_valid = true;
818
 
        }
819
 
        
820
 
        return_(is_valid);
821
 
}
822
 
 
823
 
void MSDatabase::startUp(const char *default_http_headers)
824
 
{
825
 
        enter_();
826
 
        
827
 
        new_(gDatabaseList, CSSyncSortedList);
828
 
        new_(gDatabaseArray, CSSparseArray(5));
829
 
        MSHTTPHeaderTable::setDefaultMetaDataHeaders(default_http_headers);
830
 
        PBMSSystemTables::systemTablesStartUp();
831
 
        PBMSParameters::setBackupDatabaseID(1);
832
 
        exit_();
833
 
}
834
 
 
835
 
void MSDatabase::stopThreads()
836
 
{
837
 
        MSDatabase *db;
838
 
 
839
 
        enter_();
840
 
        if (gDatabaseList) {
841
 
                lock_(gDatabaseList);
842
 
                for (int i=0;;i++) {
843
 
                        if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
844
 
                                break;
845
 
                        db->iClosing = true;
846
 
                        
847
 
                        if (db->myTempLogThread) {
848
 
                                db->myTempLogThread->stop();
849
 
                                db->myTempLogThread->release();
850
 
                                db->myTempLogThread = NULL;
851
 
                        }
852
 
                        if (db->myCompactorThread) {
853
 
                                db->myRepostoryList->wakeup(); // The compator thread waits on this.
854
 
                                db->myCompactorThread->stop();
855
 
                                db->myCompactorThread->release();
856
 
                                db->myCompactorThread = NULL;
857
 
                        }
858
 
                        
859
 
                        if (db->iBackupThread) {
860
 
                                db->iBackupThread->stop();
861
 
                                db->iBackupThread->release();
862
 
                                db->iBackupThread = NULL;
863
 
                        }
864
 
        
865
 
                }
866
 
                
867
 
                unlock_(gDatabaseList);
868
 
        }
869
 
        exit_();
870
 
}
871
 
 
872
 
void MSDatabase::shutDown()
873
 
{
874
 
                
875
 
        if (gDatabaseArray) {
876
 
                gDatabaseArray->clear();
877
 
                gDatabaseArray->release();
878
 
                gDatabaseArray = NULL;
879
 
        }
880
 
        
881
 
        if (gDatabaseList) {
882
 
                gDatabaseList->clear();
883
 
                gDatabaseList->release();
884
 
                gDatabaseList = NULL;
885
 
        }
886
 
        
887
 
        MSHTTPHeaderTable::releaseDefaultMetaDataHeaders();
888
 
        PBMSSystemTables::systemTableShutDown();
889
 
}
890
 
 
891
 
void MSDatabase::setBackupDatabase()
892
 
{
893
 
        enter_();
894
 
        // I need to give the backup database a unique fake database ID.
895
 
        // This is so that it is not confused with the database being backed
896
 
        // backed up when opening tables.
897
 
        
898
 
        // Normally database IDs are generated by time(NULL) so small database IDs
899
 
        // are safe to use as fake IDs.
900
 
         
901
 
        lock_(gDatabaseList);
902
 
        myDatabaseID = PBMSParameters::getBackupDatabaseID() +1;
903
 
        PBMSParameters::setBackupDatabaseID(myDatabaseID);
904
 
        gDatabaseArray->set(myDatabaseID, RETAIN(this));
905
 
        isBackup = true;
906
 
        
907
 
        // Notify the cloud storage, if any, that it is a backup.
908
 
        // This is important because if the backup database is dropped
909
 
        // we need to be sure that only the BLOBs belonging to the
910
 
        // backup are removed from the cloud.
911
 
        myBlobCloud->cl_setCloudIsBackup(); 
912
 
        
913
 
        unlock_(gDatabaseList);
914
 
        
915
 
        // Rename the database path so that it is obviouse that this is an incomplete backup database.
916
 
        // When the backup is completed it will be renamed back.
917
 
        CSPath *new_path = CSPath::newPath(myDatabasePath->concat("#"));
918
 
        push_(new_path);
919
 
        
920
 
        if (new_path->exists()) 
921
 
                new_path->remove();
922
 
        
923
 
        CSPath *db_path = CSPath::newPath(RETAIN(myDatabasePath));
924
 
        push_(db_path);
925
 
        
926
 
        db_path->rename(new_path->getNameCString());
927
 
        myDatabasePath->release();
928
 
        myDatabasePath = new_path->getString();
929
 
        myDatabasePath->retain();
930
 
        
931
 
        release_(db_path);
932
 
        release_(new_path);
933
 
        
934
 
        
935
 
        exit_();
936
 
}
937
 
 
938
 
void MSDatabase::releaseBackupDatabase()
939
 
{
940
 
        enter_();
941
 
 
942
 
        
943
 
        // The backup has completed succefully, rename the path to the correct name.
944
 
        CSPath *db_path = CSPath::newPath(myDatabasePath->getCString());
945
 
        push_(db_path);
946
 
        
947
 
        myDatabasePath->setLength(myDatabasePath->length()-1);
948
 
        db_path->rename(cs_last_name_of_path(myDatabasePath->getCString()));
949
 
        release_(db_path);
950
 
        
951
 
        // Remove the backup database object.
952
 
        lock_(gDatabaseList);
953
 
        gDatabaseArray->remove(myDatabaseID);
954
 
        MSTableList::removeDatabaseTables(this); // Will also release the database object.
955
 
        unlock_(gDatabaseList);
956
 
        
957
 
        
958
 
        exit_();
959
 
}
960
 
 
961
 
void MSDatabase::startBackup(MSBackupInfo *backup_info)
962
 
{
963
 
        enter_();
964
 
 
965
 
        push_(backup_info);
966
 
        if (iBackupThread) {
967
 
                if (iBackupThread->isRunning()) {
968
 
                        CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, "A backup is still running.");
969
 
                }
970
 
                iBackupThread->release();
971
 
                iBackupThread = NULL;
972
 
        }
973
 
        
974
 
        pop_(backup_info);
975
 
        iBackupThread = MSBackup::newMSBackup(backup_info);
976
 
        
977
 
        try_(a) {
978
 
                iBackupThread->startBackup(RETAIN(this));
979
 
        }
980
 
        
981
 
        catch_(a) {
982
 
                iBackupThread->release();
983
 
                iBackupThread = NULL;
984
 
                throw_();
985
 
        }
986
 
        cont_(a);
987
 
        
988
 
        exit_();
989
 
}
990
 
 
991
 
bool MSDatabase::backupStatus(uint64_t *total, uint64_t *completed, bool *completed_ok)
992
 
{
993
 
        bool done;
994
 
        
995
 
        enter_();
996
 
        
997
 
        if (iBackupThread) {
998
 
                *total = iBackupThread->getBackupSize();
999
 
                *completed = iBackupThread->getBackupCompletedSize();
1000
 
                done = !iBackupThread->isRunning();
1001
 
                *completed = (iBackupThread->getStatus() == 0);
1002
 
        } else {
1003
 
                *completed_ok = done = true;
1004
 
                *total = *completed = 0;                        
1005
 
        }
1006
 
                
1007
 
        return_(done);
1008
 
}
1009
 
 
1010
 
uint32_t MSDatabase::backupID()
1011
 
1012
 
        return (iBackupThread)?iBackupThread->backupID(): 0;
1013
 
}
1014
 
 
1015
 
void MSDatabase::terminateBackup()
1016
 
{
1017
 
        if (iBackupThread) {
1018
 
                iBackupThread->stop();
1019
 
                iBackupThread->release();
1020
 
                iBackupThread = NULL;
1021
 
        }
1022
 
}
1023
 
 
1024
 
MSDatabase *MSDatabase::getDatabase(CSString *db_name, bool create)
1025
 
{
1026
 
        MSDatabase *db;
1027
 
        enter_();
1028
 
        push_(db_name);
1029
 
        
1030
 
        
1031
 
        lock_(gDatabaseList);
1032
 
        if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
1033
 
                db = MSDatabase::loadDatabase(RETAIN(db_name), create);
1034
 
                if (!db)
1035
 
                        goto exit;
1036
 
        } else
1037
 
                db->retain();
1038
 
        
1039
 
        exit:
1040
 
        unlock_(gDatabaseList);
1041
 
        release_(db_name);
1042
 
        return_(db);
1043
 
}
1044
 
 
1045
 
MSDatabase *MSDatabase::getDatabase(const char *db_name, bool create)
1046
 
{
1047
 
        return getDatabase(CSString::newString(db_name), create);
1048
 
}
1049
 
 
1050
 
MSDatabase *MSDatabase::getDatabase(uint32_t db_id)
1051
 
{
1052
 
        MSDatabase *db;
1053
 
        
1054
 
        enter_();
1055
 
        lock_(gDatabaseList);
1056
 
        if ((db = (MSDatabase *) gDatabaseArray->get((uint32_t) db_id))) 
1057
 
                db->retain();
1058
 
        else {
1059
 
                // Look for the database folder with the correct ID:
1060
 
                CSPath *path = CSPath::newPath(PBMSDaemon::getPBMSDir());
1061
 
                push_(path);
1062
 
                if (path->exists()) {
1063
 
                        CSDirectory *dir;
1064
 
                        dir = CSDirectory::newDirectory(RETAIN(path));
1065
 
                        push_(dir);
1066
 
                        dir->open();
1067
 
                        
1068
 
                        while (dir->next() && !db) {
1069
 
                                if (!dir->isFile()) {
1070
 
                                        const char *ptr, *dir_name  = dir->name();
1071
 
                                        ptr = dir_name + strlen(dir_name) -1;
1072
 
                                        
1073
 
                                        while (ptr > dir_name && *ptr != '-') ptr--;
1074
 
                                        
1075
 
                                        if (*ptr ==  '-') {
1076
 
                                                int len = ptr - dir_name;
1077
 
                                                ptr++;
1078
 
                                                if ((strtoul(ptr, NULL, 10) == db_id) && len) {
1079
 
                                                        db = getDatabase(CSString::newString(dir_name, len), true);
1080
 
                                                        ASSERT(db->myDatabaseID == db_id);
1081
 
                                                }
1082
 
                                        }
1083
 
                                }
1084
 
                        }
1085
 
                        release_(dir);
1086
 
                }
1087
 
                release_(path);         
1088
 
        }
1089
 
        unlock_(gDatabaseList);
1090
 
        
1091
 
        if (!db) {
1092
 
                char buffer[CS_EXC_MESSAGE_SIZE];
1093
 
 
1094
 
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database #");
1095
 
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) db_id);
1096
 
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
1097
 
        }
1098
 
        return_(db);
1099
 
}
1100
 
 
1101
 
void MSDatabase::wakeTempLogThreads()
1102
 
{
1103
 
        MSDatabase *db;
1104
 
 
1105
 
        if (!gDatabaseList)
1106
 
                return;
1107
 
        enter_();
1108
 
        lock_(gDatabaseList);
1109
 
        for (int i=0;;i++) {
1110
 
                if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
1111
 
                        break;
1112
 
                if (db->myTempLogThread)
1113
 
                        db->myTempLogThread->wakeup();
1114
 
        }
1115
 
        unlock_(gDatabaseList);
1116
 
        exit_();
1117
 
}
1118
 
 
1119
 
uint32_t MSDatabase::getDBID(CSPath *path, CSString *db_name)
1120
 
{
1121
 
        CSDirectory             *dir;
1122
 
        uint32_t                        db_id = 0;
1123
 
        int                             len = db_name->length();
1124
 
        const char              *ptr;
1125
 
        
1126
 
        enter_();
1127
 
        push_(db_name);
1128
 
        push_(path);
1129
 
        
1130
 
        // Search for the ID of the database
1131
 
        dir = CSDirectory::newDirectory(RETAIN(path));
1132
 
        push_(dir);
1133
 
        dir->open();
1134
 
        while (dir->next() && !db_id) 
1135
 
                {
1136
 
                if (!dir->isFile()){
1137
 
                        ptr = dir->name() + strlen(dir->name()) -1;
1138
 
                        while (ptr > dir->name() && isdigit(*ptr)) ptr--;
1139
 
                        if ((*ptr == '-') && (len == (ptr - dir->name())) && !db_name->compare(dir->name(), len) ) {
1140
 
                                db_id = atol(ptr+1);                            
1141
 
                        }
1142
 
                }
1143
 
        }
1144
 
        release_(dir);
1145
 
        
1146
 
        if (!db_id) {
1147
 
                db_id = time(NULL);
1148
 
 
1149
 
                while (1) { // search for a unique db_id
1150
 
                        dir = CSDirectory::newDirectory(RETAIN(path));
1151
 
                        push_(dir);
1152
 
                        dir->open();
1153
 
                        while (db_id && dir->next()) {
1154
 
                                if (!dir->isFile()) {
1155
 
                                        ptr = dir->name() + strlen(dir->name()) -1;
1156
 
                                        while (ptr > dir->name() && isdigit(*ptr)) ptr--;
1157
 
                                        if ((*ptr == '-') && (db_id == strtoul(ptr+1, NULL, 10))) {
1158
 
                                                db_id = 0;                              
1159
 
                                        }
1160
 
                                }
1161
 
                        }
1162
 
                        release_(dir);
1163
 
                        if (db_id)
1164
 
                                break;
1165
 
                        sleep(1); // Allow 1 second to pass.
1166
 
                        db_id = time(NULL);
1167
 
                } 
1168
 
        }
1169
 
        
1170
 
        release_(path);
1171
 
        release_(db_name);
1172
 
        return_(db_id);
1173
 
}
1174
 
 
1175
 
CSPath *MSDatabase::createDatabasePath(const char *location, CSString *db_name, uint32_t *db_id_ptr, bool *create, bool is_pbms)
1176
 
{
1177
 
        bool create_path = *create;     
1178
 
        CSPath *path = NULL;
1179
 
        char name_buffer[MS_DATABASE_NAME_SIZE + 40];
1180
 
        uint32_t db_id;
1181
 
        enter_();
1182
 
        
1183
 
        push_(db_name);
1184
 
        *create = false;
1185
 
        path = CSPath::newPath(location, "pbms");
1186
 
        push_(path);
1187
 
        if (!path->exists()) {
1188
 
                if (!create_path){
1189
 
                        release_(path);
1190
 
                        path = NULL;
1191
 
                        goto done;
1192
 
                }
1193
 
                        
1194
 
                *create = true;
1195
 
                path->makeDir();
1196
 
        }
1197
 
 
1198
 
        // If this is the pbms database then nothing more is to be done.
1199
 
        if (is_pbms)
1200
 
                goto done;
1201
 
        
1202
 
        if ((!db_id_ptr) || !*db_id_ptr) {
1203
 
                db_id = getDBID(RETAIN(path), RETAIN(db_name));
1204
 
                if (db_id_ptr)
1205
 
                        *db_id_ptr = db_id;
1206
 
        } else
1207
 
                db_id = *db_id_ptr;
1208
 
 
1209
 
        // Create the PBMS database name with ID
1210
 
        cs_strcpy(MS_DATABASE_NAME_SIZE + 40, name_buffer, db_name->getCString());
1211
 
        cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, "-");
1212
 
        cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, (uint32_t) db_id);
1213
 
                        
1214
 
        pop_(path);
1215
 
        path = CSPath::newPath(path, name_buffer);
1216
 
        push_(path);
1217
 
        if (!path->exists()) {
1218
 
                if (create_path) {
1219
 
                        *create = true;
1220
 
                        path->makeDir();
1221
 
                } else {
1222
 
                        release_(path);
1223
 
                        path = NULL;
1224
 
                }
1225
 
                        
1226
 
        }
1227
 
        
1228
 
done:
1229
 
        if (path)
1230
 
                pop_(path);
1231
 
        release_(db_name);
1232
 
        return_(path);
1233
 
}
1234
 
 
1235
 
 
1236
 
 
1237
 
MSDatabase *MSDatabase::newDatabase(const char *db_location, CSString *db_name, uint32_t        db_id, bool create)
1238
 
{
1239
 
        MSDatabase              *db = NULL;
1240
 
        CSDirectory             *dir;
1241
 
        MSRepository    *repo;
1242
 
        CSPath                  *path;
1243
 
        const char              *file_name;
1244
 
        uint32_t                file_id;
1245
 
        off64_t                 file_size;
1246
 
        MSTempLog               *log;
1247
 
        uint32_t                to_delete = 0;
1248
 
        CSString                *db_path;
1249
 
        bool                    is_pbms = false;
1250
 
 
1251
 
        enter_();
1252
 
 
1253
 
        push_(db_name);
1254
 
 
1255
 
        //is_pbms = (strcmp(db_name->getCString(), "pbms") == 0); To be done later.
1256
 
        
1257
 
        /*
1258
 
         * Block the creation of the pbms database if there is no MySQL database. 
1259
 
         * The database name is case sensitive here if the file system names are
1260
 
         * case sensitive. This is desirable.
1261
 
         */
1262
 
        path = CSPath::newPath(ms_my_get_mysql_home_path(), RETAIN(db_name));
1263
 
        push_(path);
1264
 
        if (create && !path->exists()) {
1265
 
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
1266
 
        }
1267
 
        release_(path);
1268
 
        
1269
 
         // Create the database path, if 'create' == false then it can return NULL
1270
 
        path = createDatabasePath(db_location, RETAIN(db_name), &db_id, &create, is_pbms);
1271
 
        if (!path) {
1272
 
                release_(db_name);
1273
 
                return_(NULL);
1274
 
        }
1275
 
        push_(path);
1276
 
        
1277
 
        // Create the database object and initialize it.
1278
 
        if (!(db = new MSDatabase())) {
1279
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
1280
 
        }
1281
 
        
1282
 
        db->myIsPBMS = is_pbms;
1283
 
        db_path = path->getString();
1284
 
        db_path->retain();
1285
 
        release_(path);
1286
 
        path = NULL;
1287
 
                
1288
 
        db->iNextBlobRefId = (uint32_t) time(NULL);
1289
 
        db->iNextBlobRefId <<= 32;
1290
 
        db->iNextBlobRefId = COMMIT_MASK(db->iNextBlobRefId);
1291
 
        db->iNextBlobRefId++;
1292
 
        
1293
 
        db->myDatabaseID = db_id;
1294
 
        db->myDatabasePath = db_path;
1295
 
        db->myDatabaseName = db_name;
1296
 
        new_(db->myBlobCloud, CloudDB(db_id));
1297
 
 
1298
 
        pop_(db_name);  
1299
 
        
1300
 
        push_(db);
1301
 
        
1302
 
        
1303
 
        new_(db->myTempLogArray, CSSyncSparseArray(20));
1304
 
        new_(db->iTableList, CSSyncSortedList());
1305
 
        new_(db->iTableArray, CSSparseArray(20));
1306
 
        new_(db->myRepostoryList, CSSyncVector(20));
1307
 
        
1308
 
        if (!is_pbms) { //
1309
 
#ifdef HAVE_ALIAS_SUPPORT
1310
 
                //db->retain(); no retain here, MSAlias() takes a back ref.
1311
 
                new_(db->iBlobAliases, MSAlias(db));
1312
 
#endif          
1313
 
                /* "Load" the database: */
1314
 
 
1315
 
                /* Get the max table ID: */
1316
 
                dir = CSDirectory::newDirectory(RETAIN(db_path));
1317
 
                push_(dir);
1318
 
                dir->open();
1319
 
                while (dir->next()) {
1320
 
                        file_name = dir->name();
1321
 
                        if (dir->isFile() && cs_is_extension(file_name, "bst"))
1322
 
                                db->addTableFromFile(dir, file_name, false);
1323
 
                }
1324
 
                release_(dir);
1325
 
 
1326
 
                path = CSPath::newPath(RETAIN(db_path), "bs-repository");
1327
 
                if (path->exists()) {
1328
 
                        dir = CSDirectory::newDirectory(path);
1329
 
                        push_(dir);
1330
 
                        dir->open();
1331
 
                        while (dir->next()) {
1332
 
                                file_name = dir->name();
1333
 
                                if (dir->isFile() && cs_is_extension(file_name, "bs")) {
1334
 
                                        if ((file_id = fileToTableId(file_name, "repo"))) {
1335
 
                                                dir->info(NULL, &file_size, NULL);
1336
 
                                                new_(repo, MSRepository(file_id, db, file_size));
1337
 
                                                db->myRepostoryList->set(file_id - 1, repo);
1338
 
                                        }
1339
 
                                }
1340
 
                        }
1341
 
                        release_(dir);
1342
 
                }
1343
 
                else {
1344
 
                        path->makeDir();
1345
 
                        path->release();
1346
 
                }
1347
 
 
1348
 
                path = CSPath::newPath(RETAIN(db_path), "bs-logs");
1349
 
                if (path->exists()) {
1350
 
                        dir = CSDirectory::newDirectory(path);
1351
 
                        push_(dir);
1352
 
                        dir->open();
1353
 
                        while (dir->next()) {
1354
 
                                file_name = dir->name();
1355
 
                                if (dir->isFile()) {
1356
 
                                        if (cs_is_extension(file_name, "bs")) {
1357
 
                                                if ((file_id = fileToTableId(file_name, "temp"))) {
1358
 
                                                        dir->info(NULL, &file_size, NULL);
1359
 
                                                        new_(log, MSTempLog(file_id, db, file_size));
1360
 
                                                        db->myTempLogArray->set(file_id, log);
1361
 
                                                }
1362
 
                                        }
1363
 
                                        else if (cs_is_extension(file_name, "bst")) {
1364
 
                                                db->addTableFromFile(dir, file_name, true);
1365
 
                                                to_delete++;
1366
 
                                        }
1367
 
                                }
1368
 
                        }
1369
 
                        release_(dir);
1370
 
                }
1371
 
                else {
1372
 
                        path->makeDir();
1373
 
                        path->release();
1374
 
                }
1375
 
 
1376
 
                if (to_delete) {
1377
 
                        /* Go through and prepare all the tables that are to
1378
 
                         * be deleted:
1379
 
                         */
1380
 
                        uint32_t        i = 0;
1381
 
                        MSTable *tab;
1382
 
                        
1383
 
                        while ((tab = (MSTable *) db->iTableList->itemAt(i))) {
1384
 
                                if (tab->isToDelete())
1385
 
                                        tab->prepareToDelete();
1386
 
                                i++;
1387
 
                        }
1388
 
                }
1389
 
 
1390
 
        }
1391
 
        pop_(db);
1392
 
 
1393
 
        return_(db);
1394
 
}
1395
 
 
1396
 
void MSDatabase::startThreads()
1397
 
{
1398
 
        enter_();
1399
 
 
1400
 
        if (myIsPBMS)
1401
 
                exit_();
1402
 
                
1403
 
#ifdef HAVE_ALIAS_SUPPORT
1404
 
        // iBlobAliases->ma_open() must be called before starting any threads.
1405
 
        iBlobAliases->ma_open(); 
1406
 
#endif
1407
 
 
1408
 
        new_(myTempLogThread, MSTempLogThread(1 * 1000, this));
1409
 
        myTempLogThread->start();
1410
 
 
1411
 
#ifdef MS_COMPACTOR_POLLS
1412
 
        new_(myCompactorThread, MSCompactorThread(MS_COMPACTOR_POLL_FREQ, this));
1413
 
#else
1414
 
        new_(myCompactorThread, MSCompactorThread(MS_DEFAULT_COMPACTOR_WAIT * 1000, this));
1415
 
#endif
1416
 
 
1417
 
        
1418
 
        myCompactorThread->start();
1419
 
        exit_();
1420
 
}
1421
 
 
1422
 
 
1423
 
void MSDatabase::dropDatabase() 
1424
 
{
1425
 
        enter_();
1426
 
 
1427
 
        iDropping = true;
1428
 
        iClosing = true;
1429
 
        
1430
 
        if (iBackupThread) {
1431
 
                iBackupThread->stop();
1432
 
                iBackupThread->release();
1433
 
                iBackupThread = NULL;
1434
 
        }
1435
 
        
1436
 
        if (myTempLogThread) {
1437
 
                myTempLogThread->stop();
1438
 
                myTempLogThread->release();
1439
 
                myTempLogThread = NULL;
1440
 
        }
1441
 
        
1442
 
        if (myCompactorThread) {
1443
 
                myRepostoryList->wakeup(); // The compator thread waits on this.
1444
 
                myCompactorThread->stop();
1445
 
                myCompactorThread->release();
1446
 
                myCompactorThread = NULL;
1447
 
        }
1448
 
        
1449
 
        // Call cloud drop database even if the database is not currrently
1450
 
        // using cloud storage just in case to was in the past. If the connection
1451
 
        // to the cloud is not setup then nothing will be done.
1452
 
        try_(a) {
1453
 
                myBlobCloud->cl_dropDB();
1454
 
        }
1455
 
        catch_(a) {
1456
 
                self->logException();
1457
 
        }
1458
 
        cont_(a);
1459
 
        exit_();
1460
 
}
1461
 
 
1462
 
void MSDatabase::removeDatabasePath(CSString *doomedDatabasePath ) 
1463
 
{
1464
 
        CSPath *path = NULL;
1465
 
        CSDirectory *dir = NULL;
1466
 
        const char *file_name;
1467
 
        enter_();
1468
 
        
1469
 
        push_(doomedDatabasePath);
1470
 
        
1471
 
        // Delete repository files
1472
 
        path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-repository");
1473
 
        push_(path);
1474
 
        if (path->exists()) {
1475
 
                dir = CSDirectory::newDirectory(RETAIN(path));
1476
 
                push_(dir);
1477
 
                dir->open();
1478
 
                while (dir->next()) {
1479
 
                        file_name = dir->name();
1480
 
                        if (dir->isFile() && cs_is_extension(file_name, "bs")) {
1481
 
                                dir->deleteEntry();
1482
 
                        }
1483
 
                }
1484
 
                release_(dir);
1485
 
                if (path->isEmpty())
1486
 
                        path->removeDir();
1487
 
        }
1488
 
        release_(path);
1489
 
 
1490
 
        // Delete temp log files.
1491
 
        path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-logs");
1492
 
        push_(path);
1493
 
        if (path->exists()) {
1494
 
                dir = CSDirectory::newDirectory(RETAIN(path));
1495
 
                push_(dir);
1496
 
                dir->open();
1497
 
                while (dir->next()) {
1498
 
                        file_name = dir->name();
1499
 
                        if (dir->isFile() && (cs_is_extension(file_name, "bs") || cs_is_extension(file_name, "bst"))) {
1500
 
                                dir->deleteEntry();
1501
 
                        }
1502
 
                }
1503
 
                release_(dir);
1504
 
                if (path->isEmpty())
1505
 
                        path->removeDir();
1506
 
        }
1507
 
        release_(path);
1508
 
 
1509
 
        // Delete table reference files.
1510
 
        dir = CSDirectory::newDirectory(RETAIN(doomedDatabasePath));
1511
 
        push_(dir);
1512
 
        dir->open();
1513
 
        while (dir->next()) {
1514
 
                file_name = dir->name();
1515
 
                if (dir->isFile() && cs_is_extension(file_name, "bst"))
1516
 
                        dir->deleteEntry();
1517
 
        }
1518
 
        release_(dir);
1519
 
 
1520
 
#ifdef HAVE_ALIAS_SUPPORT
1521
 
        path = CSPath::newPath(RETAIN(doomedDatabasePath), ACTIVE_ALIAS_INDEX);
1522
 
        push_(path);
1523
 
        path->removeFile();
1524
 
        release_(path);
1525
 
#endif
1526
 
 
1527
 
        PBMSSystemTables::removeSystemTables(RETAIN(doomedDatabasePath));
1528
 
        
1529
 
        path = CSPath::newPath(RETAIN(doomedDatabasePath));
1530
 
        push_(path);
1531
 
        if (path->isEmpty() && !path->isLink()) {
1532
 
                path->removeDir();
1533
 
        } else { 
1534
 
                CSStringBuffer *new_name;
1535
 
                // If the database folder is not empty we rename it to get it out of the way.
1536
 
                // If it is not renamed it will be reused if a database with the same name is
1537
 
                // created again wich will result in the database ID being reused which may
1538
 
                // have some bad side effects.
1539
 
                new_(new_name, CSStringBuffer());
1540
 
                push_(new_name);
1541
 
                new_name->append(cs_last_name_of_path(doomedDatabasePath->getCString()));
1542
 
                new_name->append("_DROPPED");
1543
 
                path->rename(new_name->getCString());
1544
 
                release_(new_name);
1545
 
        }
1546
 
        release_(path);
1547
 
 
1548
 
        release_(doomedDatabasePath);
1549
 
        
1550
 
        path = CSPath::newPath(PBMSDaemon::getPBMSDir());
1551
 
        push_(path);
1552
 
        if (path->isEmpty() && !path->isLink()) {
1553
 
                path->removeDir();
1554
 
        }
1555
 
        release_(path);
1556
 
 
1557
 
        exit_();
1558
 
}
1559
 
 
1560
 
/* Drop the PBMS database if it exists.
1561
 
 * The root folder 'pbms' will be deleted also
1562
 
 * if it is empty and not a symbolic link.
1563
 
 * The database folder in 'pbms' is deleted if it is empty and
1564
 
 * it is not a symbolic link.
1565
 
 */
1566
 
void MSDatabase::dropDatabase(MSDatabase *doomedDatabase, const char *db_name ) 
1567
 
{
1568
 
        CSString *doomedDatabasePath = NULL;
1569
 
 
1570
 
        enter_();
1571
 
        
1572
 
        if (doomedDatabase) {
1573
 
                push_(doomedDatabase);
1574
 
                
1575
 
                // Remove any pending transactions for the dropped database.
1576
 
                // This is important because if the database is restored it will have the
1577
 
                // same database ID and the old transactions would be applied to it.
1578
 
                MSTransactionManager::dropDatabase(doomedDatabase->myDatabaseID);
1579
 
                
1580
 
                doomedDatabasePath = doomedDatabase->myDatabasePath;
1581
 
                doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
1582
 
                
1583
 
                MSTableList::removeDatabaseTables(RETAIN(doomedDatabase));
1584
 
                MSSystemTableShare::removeDatabaseSystemTables(RETAIN(doomedDatabase));
1585
 
 
1586
 
                doomedDatabase->dropDatabase(); // Shutdown database threads.
1587
 
                
1588
 
                // To avoid a deadlock a lock is not taken on the database list
1589
 
                // if shutdown is in progress. The only database that would be
1590
 
                // dropped during a shutdown is an incomplete backup database.
1591
 
                ASSERT(doomedDatabase->isBackup || !self->myMustQuit);
1592
 
                if (!self->myMustQuit) 
1593
 
                        lock_(gDatabaseList); // Be sure to shutdown the database before locking this or it can lead to deadlocks
1594
 
                        
1595
 
                gDatabaseArray->remove(doomedDatabase->myDatabaseID);
1596
 
                if (!doomedDatabase->isBackup)
1597
 
                        gDatabaseList->remove(doomedDatabase->getKey());
1598
 
                if (!self->myMustQuit) 
1599
 
                        unlock_(gDatabaseList); 
1600
 
                ASSERT(doomedDatabase->getRefCount() == 1);
1601
 
                release_(doomedDatabase);
1602
 
                
1603
 
        } else {
1604
 
                CSPath *path;
1605
 
                bool create = false;
1606
 
                uint32_t db_id;
1607
 
                
1608
 
                path = createDatabasePath(ms_my_get_mysql_home_path(), CSString::newString(db_name), &db_id, &create);
1609
 
                
1610
 
                if (path) {
1611
 
                        MSTransactionManager::dropDatabase(db_id);
1612
 
 
1613
 
                        push_(path);
1614
 
                        doomedDatabasePath = path->getString();
1615
 
                        doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
1616
 
                        release_(path);
1617
 
                }
1618
 
        }
1619
 
        
1620
 
        if (doomedDatabasePath)
1621
 
                removeDatabasePath(doomedDatabasePath);
1622
 
        
1623
 
        exit_();
1624
 
}
1625
 
 
1626
 
void MSDatabase::dropDatabase(const char *db_name ) 
1627
 
{
1628
 
        enter_();
1629
 
        dropDatabase(getDatabase(db_name, false), db_name);
1630
 
        exit_();
1631
 
}
1632
 
 
1633
 
// The table_path can be several things here:
1634
 
// 1: <absalute path>/<database>/<table>
1635
 
// 2: <absalute path>/<database>
1636
 
// 3: <database>/<table>
1637
 
bool MSDatabase::convertTablePathToIDs(const char *table_path, uint32_t *db_id, uint32_t *tab_id, bool create) 
1638
 
{
1639
 
        const char      *base = ms_my_get_mysql_home_path();
1640
 
        CSString        *table_url;
1641
 
        CSString        *db_path = NULL;
1642
 
        CSString        *db_name = NULL;
1643
 
        CSString        *tab_name = NULL;
1644
 
        MSDatabase      *db;
1645
 
        enter_();
1646
 
        
1647
 
        *db_id = 0;
1648
 
        *tab_id = 0;
1649
 
        
1650
 
        table_url = CSString::newString(table_path);
1651
 
        if (table_url->startsWith(base)) {
1652
 
                table_url = table_url->right(base);
1653
 
        }
1654
 
        push_(table_url);
1655
 
 
1656
 
 
1657
 
        db_path = table_url->left("/", -1);
1658
 
        push_(db_path);
1659
 
        tab_name = table_url->right("/", -1);
1660
 
        
1661
 
        pop_(db_path);
1662
 
        release_(table_url);
1663
 
 
1664
 
        if (db_path->length() == 0) { // Only a database name was supplied.
1665
 
                db_path->release();
1666
 
                db_name = tab_name;
1667
 
                tab_name = NULL;
1668
 
        } else {
1669
 
                if (tab_name->length() == 0) {
1670
 
                        tab_name->release();
1671
 
                        tab_name = NULL;
1672
 
                } else
1673
 
                        push_(tab_name);
1674
 
                push_(db_path);
1675
 
                db_name = db_path->right("/", -1);
1676
 
                pop_(db_path);
1677
 
                if (db_name->length() == 0) {
1678
 
                        db_name->release();
1679
 
                        db_name = db_path;
1680
 
                } else {
1681
 
                        db_path->release();
1682
 
                        db_path = NULL;
1683
 
                }
1684
 
        }
1685
 
        
1686
 
        db = MSDatabase::getDatabase(db_name, create); // This will release db_name
1687
 
        if (db) {
1688
 
                *db_id = db->myDatabaseID;
1689
 
                if (tab_name) {
1690
 
                        MSTable         *tab;
1691
 
                        pop_(tab_name);
1692
 
                        push_(db);
1693
 
                        tab = db->getTable(tab_name, create);// This will release tab_name
1694
 
                        pop_(db);
1695
 
                        if (tab) {
1696
 
                                *tab_id = tab->myTableID;
1697
 
                                tab->release();
1698
 
                        }
1699
 
                }
1700
 
                        
1701
 
                db->release();
1702
 
        }
1703
 
        
1704
 
        return_((*tab_id > 0) && (*db_id > 0));
1705
 
}
1706
 
 
1707
 
bool MSDatabase::convertTableAndDatabaseToIDs(const char *db_name, const char *tab_name, uint32_t *db_id, uint32_t *tab_id, bool create) 
1708
 
{
1709
 
        MSDatabase      *db;
1710
 
        enter_();
1711
 
        
1712
 
        *db_id = 0;
1713
 
        *tab_id = 0;
1714
 
        
1715
 
        db = MSDatabase::getDatabase(db_name, create); 
1716
 
        if (db) {
1717
 
                push_(db);
1718
 
                *db_id = db->myDatabaseID;
1719
 
                if (tab_name) {
1720
 
                        MSTable         *tab;
1721
 
                        tab = db->getTable(tab_name, create);
1722
 
                        if (tab) {
1723
 
                                *tab_id = tab->myTableID;
1724
 
                                tab->release();
1725
 
                        }
1726
 
                }
1727
 
                        
1728
 
                release_(db);
1729
 
        }
1730
 
        
1731
 
        return_((*tab_id > 0) && (*db_id > 0));
1732
 
}
1733
 
 
1734
 
MSDatabase *MSDatabase::loadDatabase(CSString *db_name, bool create)
1735
 
{
1736
 
        MSDatabase *db;
1737
 
        enter_();
1738
 
        
1739
 
        db = newDatabase(ms_my_get_mysql_home_path(), db_name, 0, create);
1740
 
        
1741
 
        if (db) {
1742
 
                push_(db);
1743
 
                
1744
 
                gDatabaseList->add(RETAIN(db));
1745
 
                
1746
 
                gDatabaseArray->set(db->myDatabaseID, RETAIN(db));
1747
 
                db->startThreads();
1748
 
                PBMSSystemTables::loadSystemTables(RETAIN(db));
1749
 
                        
1750
 
                pop_(db);
1751
 
        }
1752
 
        return_(db);
1753
 
}
1754
 
 
1755
 
uint32_t MSDatabase::getDatabaseID(CSString *db_name, bool create)
1756
 
{
1757
 
        MSDatabase *db;
1758
 
        uint32_t id = 0;
1759
 
        enter_();
1760
 
        push_(db_name);
1761
 
        
1762
 
        
1763
 
        lock_(gDatabaseList);
1764
 
        if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
1765
 
                db = MSDatabase::loadDatabase(RETAIN(db_name), create);
1766
 
                if (!db)
1767
 
                        goto exit;
1768
 
                id = db->myDatabaseID;
1769
 
                db->release();
1770
 
        } else
1771
 
                id = db->myDatabaseID;
1772
 
        
1773
 
        exit:
1774
 
        unlock_(gDatabaseList);
1775
 
        release_(db_name);
1776
 
        return_(id);
1777
 
}
1778
 
 
1779
 
 
1780
 
uint32_t MSDatabase::getDatabaseID(const char *db_name, bool create)
1781
 
{
1782
 
        return getDatabaseID(CSString::newString(db_name), create);
1783
 
}
1784
 
 
1785
 
MSDatabase *MSDatabase::getBackupDatabase(CSString *db_location, CSString *db_name, uint32_t db_id, bool create)
1786
 
{
1787
 
        bool was_created = create; 
1788
 
        CSPath *path;
1789
 
        MSDatabase *db;
1790
 
        enter_();
1791
 
        
1792
 
        push_(db_location);
1793
 
        push_(db_name);
1794
 
        // If the db already exists and 'create' == true then the existing db
1795
 
        // must be deleted.
1796
 
        // Create the database path, if 'create' == false then it can return NULL
1797
 
        path = createDatabasePath(db_location->getCString(), RETAIN(db_name), &db_id, &was_created);
1798
 
        if (!path) {
1799
 
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
1800
 
        }
1801
 
        push_(path);
1802
 
        
1803
 
        // If we wanted to create it but it already exists then throw an error.
1804
 
        if ( create && !was_created) {
1805
 
                char str[120];
1806
 
                snprintf(str, 120, "Duplicate database: %s", db_name->getCString());
1807
 
                CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, str);
1808
 
        }
1809
 
                
1810
 
        release_(path);
1811
 
        pop_(db_name);
1812
 
        // everything looks OK
1813
 
        db = newDatabase(db_location->getCString(), db_name, db_id, create);
1814
 
        db->setBackupDatabase();
1815
 
        release_(db_location);
1816
 
        return_(db);
1817
 
}
1818