~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/database_ms.cc

  • Committer: Mark Atwood
  • Date: 2011-12-20 02:32:53 UTC
  • mfrom: (2469.1.1 drizzle-build)
  • Revision ID: me@mark.atwood.name-20111220023253-bvu0kr14kwsdvz7g
mergeĀ lp:~brianaker/drizzle/deprecate-pbms

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Original author: Paul McCullagh
20
 
 * Continued development: Barry Leslie
21
 
 *
22
 
 * 2007-05-25
23
 
 *
24
 
 * H&G2JCtL
25
 
 *
26
 
 * Network interface.
27
 
 *
28
 
 */
29
 
 
30
 
#ifdef DRIZZLED
31
 
#include <config.h>
32
 
#include <drizzled/common.h>
33
 
#include <drizzled/session.h>
34
 
#include <drizzled/table.h>
35
 
#include <drizzled/message/table.pb.h>
36
 
#include <drizzled/charset.h>
37
 
#include <drizzled/table_proto.h>
38
 
#include <drizzled/field.h>
39
 
#endif
40
 
 
41
 
#include "cslib/CSConfig.h"
42
 
 
43
 
#include <string.h>
44
 
#include <ctype.h>
45
 
#include <inttypes.h>
46
 
 
47
 
#include "string.h"
48
 
 
49
 
#include "cslib/CSGlobal.h"
50
 
#include "cslib/CSLog.h"
51
 
#include "cslib/CSDirectory.h"
52
 
#include "cslib/CSStrUtil.h"
53
 
 
54
 
#include "database_ms.h"
55
 
#include "open_table_ms.h"
56
 
#include "backup_ms.h"
57
 
#include "table_ms.h"
58
 
#include "temp_log_ms.h"
59
 
#include "network_ms.h"
60
 
#include "mysql_ms.h"
61
 
#include "pbmslib.h"
62
 
#include "transaction_ms.h"
63
 
//#include "systab_variable_ms.h"
64
 
#include "systab_httpheader_ms.h"
65
 
#include "parameters_ms.h"
66
 
#include "pbmsdaemon_ms.h"
67
 
 
68
 
 
69
 
 
70
 
CSSyncSortedList        *MSDatabase::gDatabaseList;
71
 
CSSparseArray           *MSDatabase::gDatabaseArray;
72
 
/*
73
 
 * -------------------------------------------------------------------------
74
 
 * PBMS DATABASES
75
 
 */
76
 
 
77
 
MSDatabase::MSDatabase():
78
 
myIsPBMS(false),
79
 
myDatabaseID(0),
80
 
myDatabaseName(NULL),
81
 
myDatabasePath(NULL),
82
 
myTempLogArray(NULL),
83
 
myCompactorThread(NULL),
84
 
myTempLogThread(NULL),
85
 
myRepostoryList(NULL),
86
 
myBlobCloud(NULL),
87
 
myBlobType(MS_STANDARD_STORAGE),
88
 
isBackup(false),
89
 
iBackupThread(NULL),
90
 
iBackupTime(0),
91
 
iRecovering(false),
92
 
#ifdef HAVE_ALIAS_SUPPORT
93
 
iBlobAliases(NULL),
94
 
#endif
95
 
iClosing(false),
96
 
iTableList(NULL),
97
 
iTableArray(NULL),
98
 
iMaxTableID(0),
99
 
iWriteTempLog(NULL),
100
 
iDropping(false),
101
 
iNextBlobRefId(0)
102
 
{
103
 
//startTracking();
104
 
}
105
 
 
106
 
MSDatabase::~MSDatabase()
107
 
{
108
 
        iClosing = true;
109
 
        if (iBackupThread) {
110
 
                iBackupThread->stop();
111
 
                iBackupThread->release();
112
 
                iBackupThread = NULL;
113
 
        }
114
 
        
115
 
        if (myTempLogThread) {
116
 
                myTempLogThread->stop();
117
 
                myTempLogThread->release();
118
 
                myTempLogThread = NULL;
119
 
        }
120
 
        if (myCompactorThread) {
121
 
                myRepostoryList->wakeup(); // The compator thread waits on this.
122
 
                myCompactorThread->stop();
123
 
                myCompactorThread->release();
124
 
                myCompactorThread = NULL;
125
 
        }
126
 
        
127
 
        if (myDatabasePath)
128
 
                myDatabasePath->release();
129
 
                
130
 
        if (myDatabaseName)
131
 
                myDatabaseName->release();
132
 
                
133
 
        iWriteTempLog = NULL;
134
 
        if (myTempLogArray) {
135
 
                myTempLogArray->clear();
136
 
                myTempLogArray->release();
137
 
        }
138
 
        if (iTableList) {
139
 
                iTableList->clear();
140
 
                iTableList->release();
141
 
        }
142
 
        if (iTableArray){
143
 
                iTableArray->clear();
144
 
                iTableArray->release();
145
 
        }
146
 
        if (myRepostoryList) {
147
 
                myRepostoryList->clear();
148
 
                myRepostoryList->release();
149
 
        }
150
 
#ifdef HAVE_ALIAS_SUPPORT
151
 
        if (iBlobAliases) {
152
 
                iBlobAliases->ma_close();
153
 
                iBlobAliases->release();
154
 
        }
155
 
#endif
156
 
        if (myBlobCloud) {
157
 
                myBlobCloud->release();
158
 
        }
159
 
}
160
 
 
161
 
uint32_t MSDatabase::fileToTableId(const char *file_name, const char *name_part)
162
 
{
163
 
        uint32_t value = 0;
164
 
 
165
 
        if (file_name) {
166
 
                const char *num = file_name +  strlen(file_name) - 1;
167
 
                
168
 
                while (num >= file_name && *num != '-')
169
 
                        num--;
170
 
                if (name_part) {
171
 
                        /* Check the name part of the file: */
172
 
                        int len = strlen(name_part);
173
 
                        
174
 
                        if (len != num - file_name)
175
 
                                return 0;
176
 
                        if (strncmp(file_name, name_part, len) != 0)
177
 
                                return 0;
178
 
                }
179
 
                num++;
180
 
                if (isdigit(*num))
181
 
                        sscanf(num, "%"PRIu32"", &value);
182
 
        }
183
 
        return value;
184
 
}
185
 
 
186
 
const char *MSDatabase::fileToTableName(size_t size, char *tab_name, const char *file_name)
187
 
{
188
 
        const char      *cptr;
189
 
        size_t          len;
190
 
 
191
 
        file_name = cs_last_name_of_path(file_name);
192
 
        cptr = file_name + strlen(file_name) - 1;
193
 
        while (cptr > file_name && *cptr != '.')
194
 
                cptr--;
195
 
        if (cptr > file_name && *cptr == '.') {
196
 
                if (strncmp(cptr, ".bs", 2) == 0) {
197
 
                        cptr--;
198
 
                        while (cptr > file_name && isdigit(*cptr))
199
 
                                cptr--;
200
 
                }
201
 
        }
202
 
 
203
 
        len = cptr - file_name;
204
 
        if (len > size-1)
205
 
                len = size-1;
206
 
 
207
 
        memcpy(tab_name, file_name, len);
208
 
        tab_name[len] = 0;
209
 
 
210
 
        /* Return a pointer to what was removed! */
211
 
        return file_name + len;
212
 
}
213
 
 
214
 
 
215
 
const char *MSDatabase::getDatabaseNameCString()
216
 
{
217
 
        return myDatabaseName->getCString();
218
 
}
219
 
 
220
 
MSTable *MSDatabase::getTable(CSString *tab_name, bool create)
221
 
{
222
 
        MSTable *tab;
223
 
        
224
 
        enter_();
225
 
        push_(tab_name);
226
 
        lock_(iTableList);
227
 
        if (!(tab = (MSTable *) iTableList->find(tab_name))) {
228
 
 
229
 
                if (create) {
230
 
                        /* Create a new table: */
231
 
                        tab = MSTable::newTable(iMaxTableID+1, RETAIN(tab_name), this, (off64_t) 0, false);
232
 
                        iTableList->add(tab);
233
 
                        iTableArray->set(iMaxTableID+1, RETAIN(tab));
234
 
                        iMaxTableID++;
235
 
                }
236
 
        }
237
 
        if (tab)
238
 
                tab->retain();
239
 
        unlock_(iTableList);
240
 
        release_(tab_name);
241
 
        return_(tab);
242
 
}
243
 
 
244
 
MSTable *MSDatabase::getTable(const char *tab_name, bool create)
245
 
{
246
 
        return getTable(CSString::newString(tab_name), create);
247
 
}
248
 
 
249
 
 
250
 
MSTable *MSDatabase::getTable(uint32_t tab_id, bool missing_ok)
251
 
{
252
 
        MSTable *tab;
253
 
        
254
 
        enter_();
255
 
        lock_(iTableList);
256
 
        if (!(tab = (MSTable *) iTableArray->get((uint32_t) tab_id))) {
257
 
                if (missing_ok) {
258
 
                        unlock_(iTableList);
259
 
                        return_(NULL);
260
 
                }
261
 
                char buffer[CS_EXC_MESSAGE_SIZE];
262
 
 
263
 
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown table #");
264
 
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) tab_id);
265
 
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, " in database ");
266
 
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, getDatabaseNameCString());
267
 
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_TABLE, buffer);
268
 
        }
269
 
        tab->retain();
270
 
        unlock_(iTableList);
271
 
        return_(tab);
272
 
}
273
 
 
274
 
MSTable *MSDatabase::getNextTable(uint32_t *pos)
275
 
{
276
 
        uint32_t i = *pos;
277
 
        MSTable *tab = NULL;
278
 
        
279
 
        enter_();
280
 
        lock_(iTableList);
281
 
        while (i < iTableList->getSize()) {
282
 
                tab = (MSTable *) iTableList->itemAt(i++);
283
 
                if (!tab->isToDelete())
284
 
                        break;
285
 
                tab = NULL;
286
 
        }
287
 
        if (tab)
288
 
                tab->retain();
289
 
        unlock_(iTableList);
290
 
        *pos = i;
291
 
        return_(tab);
292
 
}
293
 
 
294
 
void MSDatabase::addTable(uint32_t tab_id, const char *tab_name, off64_t file_size, bool to_delete)
295
 
{
296
 
        MSTable *tab;
297
 
 
298
 
        if (tab_id > iMaxTableID)
299
 
                iMaxTableID = tab_id;
300
 
        tab = MSTable::newTable(tab_id, tab_name, this, file_size, to_delete);
301
 
        iTableList->add(tab);
302
 
        iTableArray->set(tab_id, RETAIN(tab));
303
 
}
304
 
 
305
 
void MSDatabase::addTableFromFile(CSDirectory *dir, const char *file_name, bool to_delete)
306
 
{
307
 
        off64_t file_size;
308
 
        uint32_t        file_id;
309
 
        char    tab_name[MS_TABLE_NAME_SIZE];
310
 
 
311
 
        dir->info(NULL, &file_size, NULL);
312
 
        file_id = fileToTableId(file_name);
313
 
        fileToTableName(MS_TABLE_NAME_SIZE, tab_name, file_name);
314
 
        addTable(file_id, tab_name, file_size, to_delete);
315
 
}
316
 
 
317
 
void MSDatabase::removeTable(MSTable *tab)
318
 
{
319
 
        enter_();
320
 
        push_(tab);
321
 
        lock_(iTableList);
322
 
        iTableList->remove(tab->myTableName);
323
 
        iTableArray->remove(tab->myTableID);
324
 
        unlock_(iTableList);
325
 
        release_(tab);
326
 
        exit_();
327
 
}
328
 
 
329
 
void MSDatabase::dropTable(MSTable *tab)
330
 
{
331
 
        enter_();
332
 
        push_(tab);
333
 
        lock_(iTableList);
334
 
        iTableList->remove(tab->myTableName);
335
 
        iTableArray->remove(tab->myTableID);
336
 
 
337
 
        // Cute: you drop the table by adding it with the 'to_delete' flag set to 'true'
338
 
        addTable(tab->myTableID, tab->myTableName->getCString(), tab->getTableFileSize(), true);
339
 
 
340
 
        unlock_(iTableList);
341
 
        release_(tab);
342
 
        exit_();
343
 
}
344
 
 
345
 
// This function is used when dropping tables from a database before
346
 
// dropping the database itself. 
347
 
CSString *MSDatabase::getATableName()
348
 
{
349
 
        uint32_t i = 0;
350
 
        MSTable *tab;
351
 
        CSString *name = NULL;
352
 
        
353
 
        enter_();
354
 
        lock_(iTableList);
355
 
 
356
 
        while ((tab = (MSTable *) iTableList->itemAt(i++)) && tab->isToDelete()) ;
357
 
        if (tab) {
358
 
                name = tab->getTableName();
359
 
                name->retain();
360
 
        }
361
 
        unlock_(iTableList);
362
 
        return_(name);
363
 
}
364
 
 
365
 
uint32_t MSDatabase::getTableCount()
366
 
{
367
 
        uint32_t cnt = 0, i = 0;
368
 
        MSTable *tab;
369
 
        
370
 
        enter_();
371
 
        lock_(iTableList);
372
 
 
373
 
        while ((tab = (MSTable *) iTableList->itemAt(i++))) {
374
 
                if (!tab->isToDelete())
375
 
                        cnt++;
376
 
        }
377
 
 
378
 
        unlock_(iTableList);
379
 
        return_(cnt);
380
 
}
381
 
 
382
 
 
383
 
void MSDatabase::renameTable(MSTable *tab, const char *to_name)
384
 
{
385
 
        enter_();
386
 
        lock_(iTableList);
387
 
        iTableList->remove(tab->myTableName);
388
 
        iTableArray->remove(tab->myTableID);
389
 
 
390
 
        addTable(tab->myTableID, to_name, tab->getTableFileSize(), false);
391
 
 
392
 
        unlock_(iTableList);
393
 
        exit_();
394
 
}
395
 
 
396
 
void MSDatabase::openWriteRepo(MSOpenTable *otab)
397
 
{
398
 
        if (otab->myWriteRepo && otab->myWriteRepoFile)
399
 
                return;
400
 
 
401
 
        enter_();
402
 
        if (!otab->myWriteRepo)
403
 
                otab->myWriteRepo = lockRepo(0);
404
 
 
405
 
        /* Now open the repo file for the open table: */
406
 
        otab->myWriteRepo->openRepoFileForWriting(otab);
407
 
        exit_();
408
 
}
409
 
 
410
 
MSRepository *MSDatabase::getRepoFullOfTrash(time_t *ret_wait_time)
411
 
{
412
 
        MSRepository    *repo = NULL;
413
 
        time_t                  wait_time = 0;
414
 
        
415
 
        if (ret_wait_time)
416
 
                wait_time = *ret_wait_time;
417
 
        enter_();
418
 
        lock_(myRepostoryList);
419
 
        for (uint32_t i=0; i<myRepostoryList->size(); i++) {
420
 
                retry:
421
 
                if ((repo = (MSRepository *) myRepostoryList->get(i))) {
422
 
                        if (!repo->isRemovingFP && !repo->mustBeDeleted && !repo->isRepoLocked()) {
423
 
                                if (!repo->myRepoHeadSize) {
424
 
                                        /* The file has not yet been opened, so the
425
 
                                         * garbage count will not be known!
426
 
                                         */
427
 
                                        MSRepoFile *repo_file;
428
 
 
429
 
                                        repo->retain();
430
 
                                        unlock_(myRepostoryList);
431
 
                                        push_(repo);
432
 
                                        repo_file = repo->openRepoFile();
433
 
                                        repo_file->release();
434
 
                                        release_(repo);
435
 
                                        lock_(myRepostoryList);
436
 
                                        goto retry;
437
 
                                }
438
 
                                if (repo->getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
439
 
                                        /* Make sure there are not temp BLOBs in this repository that have
440
 
                                         * not yet timed out:
441
 
                                         */
442
 
                                        time_t now = time(NULL);
443
 
                                        time_t then = repo->myLastTempTime;
444
 
 
445
 
                                        /* Check if there are any temp BLOBs to be removed: */
446
 
                                        if (now > (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
447
 
                                                repo->lockRepo(REPO_COMPACTING); 
448
 
                                                repo->retain();
449
 
                                                break;
450
 
                                        }
451
 
                                        else {
452
 
                                                /* There are temp BLOBs to wait for... */
453
 
                                                if (!wait_time || wait_time > MSTempLog::adjustWaitTime(then, now))
454
 
                                                        wait_time = MSTempLog::adjustWaitTime(then, now);
455
 
                                        }
456
 
                                }
457
 
                        }
458
 
                        repo = NULL;
459
 
                }
460
 
        }
461
 
        unlock_(myRepostoryList);
462
 
        if (ret_wait_time)
463
 
                *ret_wait_time = wait_time;
464
 
        return_(repo);
465
 
}
466
 
 
467
 
MSRepository *MSDatabase::lockRepo(off64_t size)
468
 
{
469
 
        MSRepository    *repo;
470
 
        uint32_t                        free_slot;
471
 
 
472
 
        enter_();
473
 
        lock_(myRepostoryList);
474
 
        free_slot = myRepostoryList->size();
475
 
        /* Find an unlocked repository file that is below the write threshold: */
476
 
        for (uint32_t i=0; i<myRepostoryList->size(); i++) {
477
 
                if ((repo = (MSRepository *) myRepostoryList->get(i))) {
478
 
                        if ((!repo->isRepoLocked()) && (!repo->isRemovingFP) && (!repo->mustBeDeleted) &&
479
 
                                ((repo->myRepoFileSize + size) < PBMSParameters::getRepoThreshold()) 
480
 
                                /**/ && (repo->getGarbageLevel() < PBMSParameters::getGarbageThreshold()))
481
 
                                goto found1;
482
 
                }
483
 
                else {
484
 
                        if (i < free_slot)
485
 
                                free_slot = i;
486
 
                }
487
 
        }
488
 
 
489
 
        /* None found, create a new repo file: */
490
 
        new_(repo, MSRepository(free_slot + 1, this, 0));
491
 
        myRepostoryList->set(free_slot, repo);
492
 
 
493
 
        found1:
494
 
        repo->retain();
495
 
        repo->lockRepo(REPO_WRITE);  // <- The MSRepository::backToPool() will unlock this.
496
 
        unlock_(myRepostoryList);
497
 
        return_(repo);
498
 
}
499
 
 
500
 
MSRepoFile *MSDatabase::getRepoFileFromPool(uint32_t repo_id, bool missing_ok)
501
 
{
502
 
        MSRepository    *repo;
503
 
        MSRepoFile              *file;
504
 
 
505
 
        enter_();
506
 
        lock_(myRepostoryList);
507
 
        if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1))) {
508
 
                if (!missing_ok) {
509
 
                        char buffer[CS_EXC_MESSAGE_SIZE];
510
 
 
511
 
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown repository file: ");
512
 
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
513
 
                        CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, buffer);
514
 
                }
515
 
                unlock_(myRepostoryList);
516
 
                return_(NULL);
517
 
        }
518
 
        if (repo->isRemovingFP) {
519
 
                char buffer[CS_EXC_MESSAGE_SIZE];
520
 
 
521
 
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Repository will be removed: ");
522
 
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
523
 
                CSException::throwException(CS_CONTEXT, MS_ERR_REMOVING_REPO, buffer);
524
 
        }
525
 
        repo->retain(); /* Release is here: [++] */
526
 
        file = repo->getRepoFile();
527
 
        unlock_(myRepostoryList);
528
 
 
529
 
        if (!file) {
530
 
                file = repo->openRepoFile();
531
 
                lock_(myRepostoryList);
532
 
                repo->addRepoFile(RETAIN(file));
533
 
                unlock_(myRepostoryList);
534
 
        }
535
 
        return_(file);
536
 
}
537
 
 
538
 
void MSDatabase::returnRepoFileToPool(MSRepoFile *file)
539
 
{
540
 
        MSRepository    *repo;
541
 
 
542
 
        enter_();
543
 
        lock_(myRepostoryList);
544
 
        push_(file);
545
 
        if ((repo = file->myRepo)) {
546
 
                if (repo->isRemovingFP) {
547
 
                        repo->removeRepoFile(file); // No retain expected
548
 
                        myRepostoryList->wakeup();
549
 
                }
550
 
                else
551
 
                        repo->returnRepoFile(file); // No retain expected
552
 
                repo->release(); /* [++] here is the release.  */
553
 
        }
554
 
        release_(file);
555
 
        unlock_(myRepostoryList);
556
 
        exit_();
557
 
}
558
 
 
559
 
void MSDatabase::removeRepo(uint32_t repo_id, bool *mustQuit)
560
 
{
561
 
        MSRepository *repo;
562
 
 
563
 
        enter_();
564
 
        lock_(myRepostoryList);
565
 
        while ((!mustQuit || !*mustQuit) && !iClosing) {
566
 
                if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1)))
567
 
                        break;
568
 
                repo->isRemovingFP = true;
569
 
                if (repo->removeRepoFilesNotInUse()) {
570
 
                        myRepostoryList->set(repo_id - 1, NULL);
571
 
                        break;
572
 
                }
573
 
                /*
574
 
                 * Wait for the files that are in use to be
575
 
                 * freed.
576
 
                 */
577
 
                myRepostoryList->wait();
578
 
        }
579
 
        unlock_(myRepostoryList);
580
 
        exit_();
581
 
}
582
 
 
583
 
void MSDatabase::queueTempLogEvent(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
584
 
         uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time)
585
 
{
586
 
        MSTempLogItemRec        item;
587
 
        uint32_t                                timev;
588
 
 
589
 
        // Each otab object holds an handle to an instance of an OPEN
590
 
        // temp log. This is so that each thread has it's own open temp log 
591
 
        // and doesn't need to be opened and close it constantly.
592
 
        
593
 
        enter_();
594
 
        lock_(myTempLogArray);
595
 
        if (!iWriteTempLog) {
596
 
                iWriteTempLog = (MSTempLog *) myTempLogArray->last();
597
 
                if (!iWriteTempLog) {
598
 
                        new_(iWriteTempLog, MSTempLog(1, this, 0));
599
 
                        myTempLogArray->set(1, iWriteTempLog);
600
 
                }
601
 
        }
602
 
        if (!otab->myTempLogFile)
603
 
                otab->myTempLogFile = iWriteTempLog->openTempLog();
604
 
        else if (otab->myTempLogFile->myTempLogID != iWriteTempLog->myLogID) {
605
 
                otab->myTempLogFile->release();
606
 
                otab->myTempLogFile = NULL;
607
 
                otab->myTempLogFile = iWriteTempLog->openTempLog();
608
 
        }
609
 
 
610
 
        if (iWriteTempLog->myTempLogSize >= PBMSParameters::getTempLogThreshold()) {
611
 
                uint32_t tmp_log_id = iWriteTempLog->myLogID + 1;
612
 
 
613
 
                new_(iWriteTempLog, MSTempLog(tmp_log_id, this, 0));
614
 
                myTempLogArray->set(tmp_log_id, iWriteTempLog);
615
 
 
616
 
                otab->myTempLogFile->release();
617
 
                otab->myTempLogFile = NULL;
618
 
                otab->myTempLogFile = iWriteTempLog->openTempLog();
619
 
 
620
 
        }
621
 
 
622
 
        timev = time(NULL);
623
 
        *log_id = iWriteTempLog->myLogID;
624
 
        *log_offset = (uint32_t) iWriteTempLog->myTempLogSize;
625
 
        if (q_time)
626
 
                *q_time = timev;
627
 
        iWriteTempLog->myTempLogSize += iWriteTempLog->myTemplogRecSize;
628
 
        unlock_(myTempLogArray);
629
 
 
630
 
        CS_SET_DISK_1(item.ti_type_1, type);
631
 
        CS_SET_DISK_4(item.ti_table_id_4, tab_id);
632
 
        CS_SET_DISK_6(item.ti_blob_id_6, blob_id);
633
 
        CS_SET_DISK_4(item.ti_auth_code_4, auth_code);
634
 
        CS_SET_DISK_4(item.ti_time_4, timev);
635
 
        otab->myTempLogFile->write(&item, *log_offset, sizeof(MSTempLogItemRec));
636
 
        
637
 
        
638
 
        exit_();
639
 
}
640
 
 
641
 
#ifdef HAVE_ALIAS_SUPPORT
642
 
void MSDatabase::queueForDeletion(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
643
 
         uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time, MSDiskAliasPtr aliasDiskRec)
644
 
{
645
 
        enter_();
646
 
        
647
 
        queueTempLogEvent(otab, type, tab_id, blob_id, auth_code, log_id, log_offset, q_time);
648
 
                
649
 
        // If it has an alias remove it from the ailias index.
650
 
        if (aliasDiskRec) {
651
 
                try_(a) {
652
 
                        deleteBlobAlias(aliasDiskRec);
653
 
                }
654
 
                catch_(a);
655
 
                self->logException();
656
 
                cont_(a);
657
 
        }
658
 
        
659
 
        exit_();
660
 
}
661
 
#endif
662
 
 
663
 
MSTempLogFile *MSDatabase::openTempLogFile(uint32_t log_id, size_t *log_rec_size, size_t *log_head_size)
664
 
{
665
 
        MSTempLog               *log;
666
 
        MSTempLogFile   *log_file = NULL;
667
 
 
668
 
        enter_();
669
 
        lock_(myTempLogArray);
670
 
        if (log_id)
671
 
                log = (MSTempLog *) myTempLogArray->get(log_id);
672
 
        else
673
 
                log = (MSTempLog *) myTempLogArray->first();
674
 
        if (log) {
675
 
                log_file = log->openTempLog();
676
 
                if (log_rec_size)
677
 
                        *log_rec_size = log->myTemplogRecSize;
678
 
                if (log_head_size)
679
 
                        *log_head_size = log->myTempLogHeadSize;
680
 
        }
681
 
        unlock_(myTempLogArray);
682
 
        return_(log_file);
683
 
}
684
 
 
685
 
uint32_t MSDatabase::getTempLogCount()
686
 
{
687
 
        uint32_t count;
688
 
 
689
 
        enter_();
690
 
        lock_(myTempLogArray);
691
 
        count = myTempLogArray->size();
692
 
        unlock_(myTempLogArray);
693
 
        return_(count);
694
 
}
695
 
 
696
 
void MSDatabase::removeTempLog(uint32_t log_id)
697
 
{
698
 
        enter_();
699
 
        lock_(myTempLogArray);
700
 
        myTempLogArray->remove(log_id);
701
 
        unlock_(myTempLogArray);
702
 
        exit_();
703
 
}
704
 
 
705
 
CSObject *MSDatabase::getKey()
706
 
{
707
 
        return (CSObject *) myDatabaseName;
708
 
}
709
 
 
710
 
int MSDatabase::compareKey(CSObject *key)
711
 
{
712
 
        return myDatabaseName->compare((CSString *) key);
713
 
}
714
 
 
715
 
MSCompactorThread *MSDatabase::getCompactorThread()
716
 
{
717
 
        return myCompactorThread;
718
 
}
719
 
 
720
 
CSSyncVector *MSDatabase::getRepositoryList()
721
 
{       
722
 
        return myRepostoryList;
723
 
}
724
 
 
725
 
#ifdef HAVE_ALIAS_SUPPORT
726
 
uint32_t MSDatabase::registerBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
727
 
{
728
 
        uint32_t hash;
729
 
        bool can_retry = true;
730
 
        enter_();
731
 
        
732
 
retry:
733
 
        lock_(&iBlobAliaseLock);
734
 
        
735
 
        try_(a) {
736
 
                hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
737
 
        }
738
 
        
739
 
        catch_(a) {
740
 
                unlock_(&iBlobAliaseLock);
741
 
                if (can_retry) {
742
 
                        // It can be that a duplicater alias exists that was deleted
743
 
                        // but the transaction has not been written to the repository yet.
744
 
                        // Flush all committed transactions to the repository file.
745
 
                        MSTransactionManager::flush();
746
 
                        can_retry = false;
747
 
                        goto retry;
748
 
                }
749
 
                throw_();
750
 
        }
751
 
        
752
 
        cont_(a);
753
 
        unlock_(&iBlobAliaseLock);
754
 
        return_(hash);
755
 
}
756
 
 
757
 
uint32_t MSDatabase::updateBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t old_alias_hash, const char *alias)
758
 
{
759
 
        uint32_t new_hash;
760
 
        enter_();
761
 
        lock_(&iBlobAliaseLock);
762
 
        
763
 
        new_hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
764
 
        iBlobAliases->deleteAlias(repo_id, repo_offset, old_alias_hash);
765
 
 
766
 
        unlock_(&iBlobAliaseLock);
767
 
        return_(new_hash);
768
 
}
769
 
 
770
 
void MSDatabase::deleteBlobAlias(MSDiskAliasPtr diskRec)
771
 
{
772
 
        enter_();
773
 
        lock_(&iBlobAliaseLock);
774
 
        iBlobAliases->deleteAlias(diskRec);
775
 
        unlock_(&iBlobAliaseLock);
776
 
        exit_();
777
 
}
778
 
 
779
 
void MSDatabase::deleteBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
780
 
{
781
 
        MSDiskAliasRec diskRec;
782
 
        
783
 
        CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);   
784
 
        CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);        
785
 
        CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);
786
 
        deleteBlobAlias(&diskRec);
787
 
}
788
 
 
789
 
void MSDatabase::moveBlobAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
790
 
{
791
 
        enter_();
792
 
        lock_(&iBlobAliaseLock);
793
 
        iBlobAliases->resetAlias(old_repo_id, old_repo_offset, alias_hash, new_repo_id, new_repo_offset);
794
 
        unlock_(&iBlobAliaseLock);
795
 
        exit_();
796
 
}
797
 
#endif
798
 
 
799
 
bool MSDatabase::isValidHeaderField(const char *name)
800
 
{
801
 
        bool is_valid = false;
802
 
        CSString                *header;
803
 
        enter_();
804
 
 
805
 
        if (name && *name) {
806
 
                if (strcasecmp(name, MS_ALIAS_TAG)) {
807
 
                        lock_(&iHTTPMetaDataHeaders);
808
 
                        header = CSString::newString(name);
809
 
                        push_(header);
810
 
                                
811
 
                        is_valid = (iHTTPMetaDataHeaders.find(header) != NULL);
812
 
                        release_(header);
813
 
                        
814
 
                        unlock_(&iHTTPMetaDataHeaders);
815
 
                } else 
816
 
                        is_valid = true;
817
 
        }
818
 
        
819
 
        return_(is_valid);
820
 
}
821
 
 
822
 
void MSDatabase::startUp(const char *default_http_headers)
823
 
{
824
 
        enter_();
825
 
        
826
 
        new_(gDatabaseList, CSSyncSortedList);
827
 
        new_(gDatabaseArray, CSSparseArray(5));
828
 
        MSHTTPHeaderTable::setDefaultMetaDataHeaders(default_http_headers);
829
 
        PBMSSystemTables::systemTablesStartUp();
830
 
        PBMSParameters::setBackupDatabaseID(1);
831
 
        exit_();
832
 
}
833
 
 
834
 
void MSDatabase::stopThreads()
835
 
{
836
 
        MSDatabase *db;
837
 
 
838
 
        enter_();
839
 
        if (gDatabaseList) {
840
 
                lock_(gDatabaseList);
841
 
                for (int i=0;;i++) {
842
 
                        if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
843
 
                                break;
844
 
                        db->iClosing = true;
845
 
                        
846
 
                        if (db->myTempLogThread) {
847
 
                                db->myTempLogThread->stop();
848
 
                                db->myTempLogThread->release();
849
 
                                db->myTempLogThread = NULL;
850
 
                        }
851
 
                        if (db->myCompactorThread) {
852
 
                                db->myRepostoryList->wakeup(); // The compator thread waits on this.
853
 
                                db->myCompactorThread->stop();
854
 
                                db->myCompactorThread->release();
855
 
                                db->myCompactorThread = NULL;
856
 
                        }
857
 
                        
858
 
                        if (db->iBackupThread) {
859
 
                                db->iBackupThread->stop();
860
 
                                db->iBackupThread->release();
861
 
                                db->iBackupThread = NULL;
862
 
                        }
863
 
        
864
 
                }
865
 
                
866
 
                unlock_(gDatabaseList);
867
 
        }
868
 
        exit_();
869
 
}
870
 
 
871
 
void MSDatabase::shutDown()
872
 
{
873
 
                
874
 
        if (gDatabaseArray) {
875
 
                gDatabaseArray->clear();
876
 
                gDatabaseArray->release();
877
 
                gDatabaseArray = NULL;
878
 
        }
879
 
        
880
 
        if (gDatabaseList) {
881
 
                gDatabaseList->clear();
882
 
                gDatabaseList->release();
883
 
                gDatabaseList = NULL;
884
 
        }
885
 
        
886
 
        MSHTTPHeaderTable::releaseDefaultMetaDataHeaders();
887
 
        PBMSSystemTables::systemTableShutDown();
888
 
}
889
 
 
890
 
void MSDatabase::setBackupDatabase()
891
 
{
892
 
        enter_();
893
 
        // I need to give the backup database a unique fake database ID.
894
 
        // This is so that it is not confused with the database being backed
895
 
        // backed up when opening tables.
896
 
        
897
 
        // Normally database IDs are generated by time(NULL) so small database IDs
898
 
        // are safe to use as fake IDs.
899
 
         
900
 
        lock_(gDatabaseList);
901
 
        myDatabaseID = PBMSParameters::getBackupDatabaseID() +1;
902
 
        PBMSParameters::setBackupDatabaseID(myDatabaseID);
903
 
        gDatabaseArray->set(myDatabaseID, RETAIN(this));
904
 
        isBackup = true;
905
 
        
906
 
        // Notify the cloud storage, if any, that it is a backup.
907
 
        // This is important because if the backup database is dropped
908
 
        // we need to be sure that only the BLOBs belonging to the
909
 
        // backup are removed from the cloud.
910
 
        myBlobCloud->cl_setCloudIsBackup(); 
911
 
        
912
 
        unlock_(gDatabaseList);
913
 
        
914
 
        // Rename the database path so that it is obviouse that this is an incomplete backup database.
915
 
        // When the backup is completed it will be renamed back.
916
 
        CSPath *new_path = CSPath::newPath(myDatabasePath->concat("#"));
917
 
        push_(new_path);
918
 
        
919
 
        if (new_path->exists()) 
920
 
                new_path->remove();
921
 
        
922
 
        CSPath *db_path = CSPath::newPath(RETAIN(myDatabasePath));
923
 
        push_(db_path);
924
 
        
925
 
        db_path->rename(new_path->getNameCString());
926
 
        myDatabasePath->release();
927
 
        myDatabasePath = new_path->getString();
928
 
        myDatabasePath->retain();
929
 
        
930
 
        release_(db_path);
931
 
        release_(new_path);
932
 
        
933
 
        
934
 
        exit_();
935
 
}
936
 
 
937
 
void MSDatabase::releaseBackupDatabase()
938
 
{
939
 
        enter_();
940
 
 
941
 
        
942
 
        // The backup has completed succefully, rename the path to the correct name.
943
 
        CSPath *db_path = CSPath::newPath(myDatabasePath->getCString());
944
 
        push_(db_path);
945
 
        
946
 
        myDatabasePath->setLength(myDatabasePath->length()-1);
947
 
        db_path->rename(cs_last_name_of_path(myDatabasePath->getCString()));
948
 
        release_(db_path);
949
 
        
950
 
        // Remove the backup database object.
951
 
        lock_(gDatabaseList);
952
 
        gDatabaseArray->remove(myDatabaseID);
953
 
        MSTableList::removeDatabaseTables(this); // Will also release the database object.
954
 
        unlock_(gDatabaseList);
955
 
        
956
 
        
957
 
        exit_();
958
 
}
959
 
 
960
 
void MSDatabase::startBackup(MSBackupInfo *backup_info)
961
 
{
962
 
        enter_();
963
 
 
964
 
        push_(backup_info);
965
 
        if (iBackupThread) {
966
 
                if (iBackupThread->isRunning()) {
967
 
                        CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, "A backup is still running.");
968
 
                }
969
 
                iBackupThread->release();
970
 
                iBackupThread = NULL;
971
 
        }
972
 
        
973
 
        pop_(backup_info);
974
 
        iBackupThread = MSBackup::newMSBackup(backup_info);
975
 
        
976
 
        try_(a) {
977
 
                iBackupThread->startBackup(RETAIN(this));
978
 
        }
979
 
        
980
 
        catch_(a) {
981
 
                iBackupThread->release();
982
 
                iBackupThread = NULL;
983
 
                throw_();
984
 
        }
985
 
        cont_(a);
986
 
        
987
 
        exit_();
988
 
}
989
 
 
990
 
bool MSDatabase::backupStatus(uint64_t *total, uint64_t *completed, bool *completed_ok)
991
 
{
992
 
        bool done;
993
 
        
994
 
        enter_();
995
 
        
996
 
        if (iBackupThread) {
997
 
                *total = iBackupThread->getBackupSize();
998
 
                *completed = iBackupThread->getBackupCompletedSize();
999
 
                done = !iBackupThread->isRunning();
1000
 
                *completed = (iBackupThread->getStatus() == 0);
1001
 
        } else {
1002
 
                *completed_ok = done = true;
1003
 
                *total = *completed = 0;                        
1004
 
        }
1005
 
                
1006
 
        return_(done);
1007
 
}
1008
 
 
1009
 
uint32_t MSDatabase::backupID()
1010
 
1011
 
        return (iBackupThread)?iBackupThread->backupID(): 0;
1012
 
}
1013
 
 
1014
 
void MSDatabase::terminateBackup()
1015
 
{
1016
 
        if (iBackupThread) {
1017
 
                iBackupThread->stop();
1018
 
                iBackupThread->release();
1019
 
                iBackupThread = NULL;
1020
 
        }
1021
 
}
1022
 
 
1023
 
MSDatabase *MSDatabase::getDatabase(CSString *db_name, bool create)
1024
 
{
1025
 
        MSDatabase *db;
1026
 
        enter_();
1027
 
        push_(db_name);
1028
 
        
1029
 
        
1030
 
        lock_(gDatabaseList);
1031
 
        if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
1032
 
                db = MSDatabase::loadDatabase(RETAIN(db_name), create);
1033
 
                if (!db)
1034
 
                        goto exit;
1035
 
        } else
1036
 
                db->retain();
1037
 
        
1038
 
        exit:
1039
 
        unlock_(gDatabaseList);
1040
 
        release_(db_name);
1041
 
        return_(db);
1042
 
}
1043
 
 
1044
 
MSDatabase *MSDatabase::getDatabase(const char *db_name, bool create)
1045
 
{
1046
 
        return getDatabase(CSString::newString(db_name), create);
1047
 
}
1048
 
 
1049
 
MSDatabase *MSDatabase::getDatabase(uint32_t db_id, bool missing_ok)
1050
 
{
1051
 
        MSDatabase *db;
1052
 
        
1053
 
        enter_();
1054
 
        lock_(gDatabaseList);
1055
 
        if ((db = (MSDatabase *) gDatabaseArray->get((uint32_t) db_id))) 
1056
 
                db->retain();
1057
 
        else {
1058
 
                // Look for the database folder with the correct ID:
1059
 
                CSPath *path = CSPath::newPath(PBMSDaemon::getPBMSDir());
1060
 
                push_(path);
1061
 
                if (path->exists()) {
1062
 
                        CSDirectory *dir;
1063
 
                        dir = CSDirectory::newDirectory(RETAIN(path));
1064
 
                        push_(dir);
1065
 
                        dir->open();
1066
 
                        
1067
 
                        while (dir->next() && !db) {
1068
 
                                if (!dir->isFile()) {
1069
 
                                        const char *ptr, *dir_name  = dir->name();
1070
 
                                        ptr = dir_name + strlen(dir_name) -1;
1071
 
                                        
1072
 
                                        while (ptr > dir_name && *ptr != '-') ptr--;
1073
 
                                        
1074
 
                                        if (*ptr ==  '-') {
1075
 
                                                int len = ptr - dir_name;
1076
 
                                                ptr++;
1077
 
                                                if ((strtoul(ptr, NULL, 10) == db_id) && len) {
1078
 
                                                        db = getDatabase(CSString::newString(dir_name, len), true);
1079
 
                                                        ASSERT(db->myDatabaseID == db_id);
1080
 
                                                }
1081
 
                                        }
1082
 
                                }
1083
 
                        }
1084
 
                        release_(dir);
1085
 
                }
1086
 
                release_(path);         
1087
 
        }
1088
 
        unlock_(gDatabaseList);
1089
 
        
1090
 
        if ((!db) && !missing_ok) {
1091
 
                char buffer[CS_EXC_MESSAGE_SIZE];
1092
 
 
1093
 
                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database #");
1094
 
                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) db_id);
1095
 
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
1096
 
        }
1097
 
        return_(db);
1098
 
}
1099
 
 
1100
 
void MSDatabase::wakeTempLogThreads()
1101
 
{
1102
 
        MSDatabase *db;
1103
 
 
1104
 
        if (!gDatabaseList)
1105
 
                return;
1106
 
        enter_();
1107
 
        lock_(gDatabaseList);
1108
 
        for (int i=0;;i++) {
1109
 
                if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
1110
 
                        break;
1111
 
                if (db->myTempLogThread)
1112
 
                        db->myTempLogThread->wakeup();
1113
 
        }
1114
 
        unlock_(gDatabaseList);
1115
 
        exit_();
1116
 
}
1117
 
 
1118
 
uint32_t MSDatabase::getDBID(CSPath *path, CSString *db_name)
1119
 
{
1120
 
        CSDirectory             *dir;
1121
 
        uint32_t                        db_id = 0;
1122
 
        int                             len = db_name->length();
1123
 
        const char              *ptr;
1124
 
        
1125
 
        enter_();
1126
 
        push_(db_name);
1127
 
        push_(path);
1128
 
        
1129
 
        // Search for the ID of the database
1130
 
        dir = CSDirectory::newDirectory(RETAIN(path));
1131
 
        push_(dir);
1132
 
        dir->open();
1133
 
        while (dir->next() && !db_id) 
1134
 
                {
1135
 
                if (!dir->isFile()){
1136
 
                        ptr = dir->name() + strlen(dir->name()) -1;
1137
 
                        while (ptr > dir->name() && isdigit(*ptr)) ptr--;
1138
 
                        if ((*ptr == '-') && (len == (ptr - dir->name())) && !db_name->compare(dir->name(), len) ) {
1139
 
                                db_id = atol(ptr+1);                            
1140
 
                        }
1141
 
                }
1142
 
        }
1143
 
        release_(dir);
1144
 
        
1145
 
        if (!db_id) {
1146
 
                db_id = time(NULL);
1147
 
 
1148
 
                while (1) { // search for a unique db_id
1149
 
                        dir = CSDirectory::newDirectory(RETAIN(path));
1150
 
                        push_(dir);
1151
 
                        dir->open();
1152
 
                        while (db_id && dir->next()) {
1153
 
                                if (!dir->isFile()) {
1154
 
                                        ptr = dir->name() + strlen(dir->name()) -1;
1155
 
                                        while (ptr > dir->name() && isdigit(*ptr)) ptr--;
1156
 
                                        if ((*ptr == '-') && (db_id == strtoul(ptr+1, NULL, 10))) {
1157
 
                                                db_id = 0;                              
1158
 
                                        }
1159
 
                                }
1160
 
                        }
1161
 
                        release_(dir);
1162
 
                        if (db_id)
1163
 
                                break;
1164
 
                        sleep(1); // Allow 1 second to pass.
1165
 
                        db_id = time(NULL);
1166
 
                } 
1167
 
        }
1168
 
        
1169
 
        release_(path);
1170
 
        release_(db_name);
1171
 
        return_(db_id);
1172
 
}
1173
 
 
1174
 
CSPath *MSDatabase::createDatabasePath(const char *location, CSString *db_name, uint32_t *db_id_ptr, bool *create, bool is_pbms)
1175
 
{
1176
 
        bool create_path = *create;     
1177
 
        CSPath *path = NULL;
1178
 
        char name_buffer[MS_DATABASE_NAME_SIZE + 40];
1179
 
        uint32_t db_id;
1180
 
        enter_();
1181
 
        
1182
 
        push_(db_name);
1183
 
        *create = false;
1184
 
        path = CSPath::newPath(location, "pbms");
1185
 
        push_(path);
1186
 
        if (!path->exists()) {
1187
 
                if (!create_path){
1188
 
                        release_(path);
1189
 
                        path = NULL;
1190
 
                        goto done;
1191
 
                }
1192
 
                        
1193
 
                *create = true;
1194
 
                path->makeDir();
1195
 
        }
1196
 
 
1197
 
        // If this is the pbms database then nothing more is to be done.
1198
 
        if (is_pbms)
1199
 
                goto done;
1200
 
        
1201
 
        if ((!db_id_ptr) || !*db_id_ptr) {
1202
 
                db_id = getDBID(RETAIN(path), RETAIN(db_name));
1203
 
                if (db_id_ptr)
1204
 
                        *db_id_ptr = db_id;
1205
 
        } else
1206
 
                db_id = *db_id_ptr;
1207
 
 
1208
 
        // Create the PBMS database name with ID
1209
 
        cs_strcpy(MS_DATABASE_NAME_SIZE + 40, name_buffer, db_name->getCString());
1210
 
        cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, "-");
1211
 
        cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, (uint32_t) db_id);
1212
 
                        
1213
 
        pop_(path);
1214
 
        path = CSPath::newPath(path, name_buffer);
1215
 
        push_(path);
1216
 
        if (!path->exists()) {
1217
 
                if (create_path) {
1218
 
                        *create = true;
1219
 
                        path->makeDir();
1220
 
                } else {
1221
 
                        release_(path);
1222
 
                        path = NULL;
1223
 
                }
1224
 
                        
1225
 
        }
1226
 
        
1227
 
done:
1228
 
        if (path)
1229
 
                pop_(path);
1230
 
        release_(db_name);
1231
 
        return_(path);
1232
 
}
1233
 
 
1234
 
 
1235
 
 
1236
 
MSDatabase *MSDatabase::newDatabase(const char *db_location, CSString *db_name, uint32_t        db_id, bool create)
1237
 
{
1238
 
        MSDatabase              *db = NULL;
1239
 
        CSDirectory             *dir;
1240
 
        MSRepository    *repo;
1241
 
        CSPath                  *path;
1242
 
        const char              *file_name;
1243
 
        uint32_t                file_id;
1244
 
        off64_t                 file_size;
1245
 
        MSTempLog               *log;
1246
 
        uint32_t                to_delete = 0;
1247
 
        CSString                *db_path;
1248
 
        bool                    is_pbms = false;
1249
 
 
1250
 
        enter_();
1251
 
 
1252
 
        push_(db_name);
1253
 
 
1254
 
        //is_pbms = (strcmp(db_name->getCString(), "pbms") == 0); To be done later.
1255
 
        
1256
 
        /*
1257
 
         * Block the creation of the pbms database if there is no MySQL database. 
1258
 
         * The database name is case sensitive here if the file system names are
1259
 
         * case sensitive. This is desirable.
1260
 
         */
1261
 
        path = CSPath::newPath(ms_my_get_mysql_home_path(), RETAIN(db_name));
1262
 
        push_(path);
1263
 
        if (create && !path->exists()) {
1264
 
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
1265
 
        }
1266
 
        release_(path);
1267
 
        
1268
 
         // Create the database path, if 'create' == false then it can return NULL
1269
 
        path = createDatabasePath(db_location, RETAIN(db_name), &db_id, &create, is_pbms);
1270
 
        if (!path) {
1271
 
                release_(db_name);
1272
 
                return_(NULL);
1273
 
        }
1274
 
        push_(path);
1275
 
        
1276
 
        // Create the database object and initialize it.
1277
 
        if (!(db = new MSDatabase())) {
1278
 
                CSException::throwOSError(CS_CONTEXT, ENOMEM);
1279
 
        }
1280
 
        
1281
 
        db->myIsPBMS = is_pbms;
1282
 
        db_path = path->getString();
1283
 
        db_path->retain();
1284
 
        release_(path);
1285
 
        path = NULL;
1286
 
                
1287
 
        db->iNextBlobRefId = (uint32_t) time(NULL);
1288
 
        db->iNextBlobRefId <<= 32;
1289
 
        db->iNextBlobRefId = COMMIT_MASK(db->iNextBlobRefId);
1290
 
        db->iNextBlobRefId++;
1291
 
        
1292
 
        db->myDatabaseID = db_id;
1293
 
        db->myDatabasePath = db_path;
1294
 
        db->myDatabaseName = db_name;
1295
 
        new_(db->myBlobCloud, CloudDB(db_id));
1296
 
 
1297
 
        pop_(db_name);  
1298
 
        
1299
 
        push_(db);
1300
 
        
1301
 
        
1302
 
        new_(db->myTempLogArray, CSSyncSparseArray(20));
1303
 
        new_(db->iTableList, CSSyncSortedList());
1304
 
        new_(db->iTableArray, CSSparseArray(20));
1305
 
        new_(db->myRepostoryList, CSSyncVector(20));
1306
 
        
1307
 
        if (!is_pbms) { //
1308
 
#ifdef HAVE_ALIAS_SUPPORT
1309
 
                //db->retain(); no retain here, MSAlias() takes a back ref.
1310
 
                new_(db->iBlobAliases, MSAlias(db));
1311
 
#endif          
1312
 
                /* "Load" the database: */
1313
 
 
1314
 
                /* Get the max table ID: */
1315
 
                dir = CSDirectory::newDirectory(RETAIN(db_path));
1316
 
                push_(dir);
1317
 
                dir->open();
1318
 
                while (dir->next()) {
1319
 
                        file_name = dir->name();
1320
 
                        if (dir->isFile() && cs_is_extension(file_name, "bst"))
1321
 
                                db->addTableFromFile(dir, file_name, false);
1322
 
                }
1323
 
                release_(dir);
1324
 
 
1325
 
                path = CSPath::newPath(RETAIN(db_path), "bs-repository");
1326
 
                if (path->exists()) {
1327
 
                        dir = CSDirectory::newDirectory(path);
1328
 
                        push_(dir);
1329
 
                        dir->open();
1330
 
                        while (dir->next()) {
1331
 
                                file_name = dir->name();
1332
 
                                if (dir->isFile() && cs_is_extension(file_name, "bs")) {
1333
 
                                        if ((file_id = fileToTableId(file_name, "repo"))) {
1334
 
                                                dir->info(NULL, &file_size, NULL);
1335
 
                                                new_(repo, MSRepository(file_id, db, file_size));
1336
 
                                                db->myRepostoryList->set(file_id - 1, repo);
1337
 
                                        }
1338
 
                                }
1339
 
                        }
1340
 
                        release_(dir);
1341
 
                }
1342
 
                else {
1343
 
                        path->makeDir();
1344
 
                        path->release();
1345
 
                }
1346
 
 
1347
 
                path = CSPath::newPath(RETAIN(db_path), "bs-logs");
1348
 
                if (path->exists()) {
1349
 
                        dir = CSDirectory::newDirectory(path);
1350
 
                        push_(dir);
1351
 
                        dir->open();
1352
 
                        while (dir->next()) {
1353
 
                                file_name = dir->name();
1354
 
                                if (dir->isFile()) {
1355
 
                                        if (cs_is_extension(file_name, "bs")) {
1356
 
                                                if ((file_id = fileToTableId(file_name, "temp"))) {
1357
 
                                                        dir->info(NULL, &file_size, NULL);
1358
 
                                                        new_(log, MSTempLog(file_id, db, file_size));
1359
 
                                                        db->myTempLogArray->set(file_id, log);
1360
 
                                                }
1361
 
                                        }
1362
 
                                        else if (cs_is_extension(file_name, "bst")) {
1363
 
                                                db->addTableFromFile(dir, file_name, true);
1364
 
                                                to_delete++;
1365
 
                                        }
1366
 
                                }
1367
 
                        }
1368
 
                        release_(dir);
1369
 
                }
1370
 
                else {
1371
 
                        path->makeDir();
1372
 
                        path->release();
1373
 
                }
1374
 
 
1375
 
                if (to_delete) {
1376
 
                        /* Go through and prepare all the tables that are to
1377
 
                         * be deleted:
1378
 
                         */
1379
 
                        uint32_t        i = 0;
1380
 
                        MSTable *tab;
1381
 
                        
1382
 
                        while ((tab = (MSTable *) db->iTableList->itemAt(i))) {
1383
 
                                if (tab->isToDelete())
1384
 
                                        tab->prepareToDelete();
1385
 
                                i++;
1386
 
                        }
1387
 
                }
1388
 
 
1389
 
        }
1390
 
        pop_(db);
1391
 
 
1392
 
        return_(db);
1393
 
}
1394
 
 
1395
 
void MSDatabase::startThreads()
1396
 
{
1397
 
        enter_();
1398
 
 
1399
 
        if (myIsPBMS)
1400
 
                exit_();
1401
 
                
1402
 
#ifdef HAVE_ALIAS_SUPPORT
1403
 
        // iBlobAliases->ma_open() must be called before starting any threads.
1404
 
        iBlobAliases->ma_open(); 
1405
 
#endif
1406
 
 
1407
 
        new_(myTempLogThread, MSTempLogThread(1 * 1000, this));
1408
 
        myTempLogThread->start();
1409
 
 
1410
 
#ifdef MS_COMPACTOR_POLLS
1411
 
        new_(myCompactorThread, MSCompactorThread(MS_COMPACTOR_POLL_FREQ, this));
1412
 
#else
1413
 
        new_(myCompactorThread, MSCompactorThread(MS_DEFAULT_COMPACTOR_WAIT * 1000, this));
1414
 
#endif
1415
 
 
1416
 
        
1417
 
        myCompactorThread->start();
1418
 
        exit_();
1419
 
}
1420
 
 
1421
 
 
1422
 
void MSDatabase::dropDatabase() 
1423
 
{
1424
 
        enter_();
1425
 
 
1426
 
        iDropping = true;
1427
 
        iClosing = true;
1428
 
        
1429
 
        if (iBackupThread) {
1430
 
                iBackupThread->stop();
1431
 
                iBackupThread->release();
1432
 
                iBackupThread = NULL;
1433
 
        }
1434
 
        
1435
 
        if (myTempLogThread) {
1436
 
                myTempLogThread->stop();
1437
 
                myTempLogThread->release();
1438
 
                myTempLogThread = NULL;
1439
 
        }
1440
 
        
1441
 
        if (myCompactorThread) {
1442
 
                myRepostoryList->wakeup(); // The compator thread waits on this.
1443
 
                myCompactorThread->stop();
1444
 
                myCompactorThread->release();
1445
 
                myCompactorThread = NULL;
1446
 
        }
1447
 
        
1448
 
        // Call cloud drop database even if the database is not currrently
1449
 
        // using cloud storage just in case to was in the past. If the connection
1450
 
        // to the cloud is not setup then nothing will be done.
1451
 
        try_(a) {
1452
 
                myBlobCloud->cl_dropDB();
1453
 
        }
1454
 
        catch_(a) {
1455
 
                self->logException();
1456
 
        }
1457
 
        cont_(a);
1458
 
        exit_();
1459
 
}
1460
 
 
1461
 
void MSDatabase::removeDatabasePath(CSString *doomedDatabasePath ) 
1462
 
{
1463
 
        CSPath *path = NULL;
1464
 
        CSDirectory *dir = NULL;
1465
 
        const char *file_name;
1466
 
        enter_();
1467
 
        
1468
 
        push_(doomedDatabasePath);
1469
 
        
1470
 
        // Delete repository files
1471
 
        path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-repository");
1472
 
        push_(path);
1473
 
        if (path->exists()) {
1474
 
                dir = CSDirectory::newDirectory(RETAIN(path));
1475
 
                push_(dir);
1476
 
                dir->open();
1477
 
                while (dir->next()) {
1478
 
                        file_name = dir->name();
1479
 
                        if (dir->isFile() && cs_is_extension(file_name, "bs")) {
1480
 
                                dir->deleteEntry();
1481
 
                        }
1482
 
                }
1483
 
                release_(dir);
1484
 
                if (path->isEmpty())
1485
 
                        path->removeDir();
1486
 
        }
1487
 
        release_(path);
1488
 
 
1489
 
        // Delete temp log files.
1490
 
        path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-logs");
1491
 
        push_(path);
1492
 
        if (path->exists()) {
1493
 
                dir = CSDirectory::newDirectory(RETAIN(path));
1494
 
                push_(dir);
1495
 
                dir->open();
1496
 
                while (dir->next()) {
1497
 
                        file_name = dir->name();
1498
 
                        if (dir->isFile() && (cs_is_extension(file_name, "bs") || cs_is_extension(file_name, "bst"))) {
1499
 
                                dir->deleteEntry();
1500
 
                        }
1501
 
                }
1502
 
                release_(dir);
1503
 
                if (path->isEmpty())
1504
 
                        path->removeDir();
1505
 
        }
1506
 
        release_(path);
1507
 
 
1508
 
        // Delete table reference files.
1509
 
        dir = CSDirectory::newDirectory(RETAIN(doomedDatabasePath));
1510
 
        push_(dir);
1511
 
        dir->open();
1512
 
        while (dir->next()) {
1513
 
                file_name = dir->name();
1514
 
                if (dir->isFile() && cs_is_extension(file_name, "bst"))
1515
 
                        dir->deleteEntry();
1516
 
        }
1517
 
        release_(dir);
1518
 
 
1519
 
#ifdef HAVE_ALIAS_SUPPORT
1520
 
        path = CSPath::newPath(RETAIN(doomedDatabasePath), ACTIVE_ALIAS_INDEX);
1521
 
        push_(path);
1522
 
        path->removeFile();
1523
 
        release_(path);
1524
 
#endif
1525
 
 
1526
 
        PBMSSystemTables::removeSystemTables(RETAIN(doomedDatabasePath));
1527
 
        
1528
 
        path = CSPath::newPath(RETAIN(doomedDatabasePath));
1529
 
        push_(path);
1530
 
        if (path->isEmpty() && !path->isLink()) {
1531
 
                path->removeDir();
1532
 
        } else { 
1533
 
                CSStringBuffer *new_name;
1534
 
                // If the database folder is not empty we rename it to get it out of the way.
1535
 
                // If it is not renamed it will be reused if a database with the same name is
1536
 
                // created again wich will result in the database ID being reused which may
1537
 
                // have some bad side effects.
1538
 
                new_(new_name, CSStringBuffer());
1539
 
                push_(new_name);
1540
 
                new_name->append(cs_last_name_of_path(doomedDatabasePath->getCString()));
1541
 
                new_name->append("_DROPPED");
1542
 
                path->rename(new_name->getCString());
1543
 
                release_(new_name);
1544
 
        }
1545
 
        release_(path);
1546
 
 
1547
 
        release_(doomedDatabasePath);
1548
 
        
1549
 
        path = CSPath::newPath(PBMSDaemon::getPBMSDir());
1550
 
        push_(path);
1551
 
        if (path->isEmpty() && !path->isLink()) {
1552
 
                path->removeDir();
1553
 
        }
1554
 
        release_(path);
1555
 
 
1556
 
        exit_();
1557
 
}
1558
 
 
1559
 
/* Drop the PBMS database if it exists.
1560
 
 * The root folder 'pbms' will be deleted also
1561
 
 * if it is empty and not a symbolic link.
1562
 
 * The database folder in 'pbms' is deleted if it is empty and
1563
 
 * it is not a symbolic link.
1564
 
 */
1565
 
void MSDatabase::dropDatabase(MSDatabase *doomedDatabase, const char *db_name ) 
1566
 
{
1567
 
        CSString *doomedDatabasePath = NULL;
1568
 
 
1569
 
        enter_();
1570
 
        
1571
 
        if (doomedDatabase) {
1572
 
                push_(doomedDatabase);
1573
 
                
1574
 
                // Remove any pending transactions for the dropped database.
1575
 
                // This is important because if the database is restored it will have the
1576
 
                // same database ID and the old transactions would be applied to it.
1577
 
                MSTransactionManager::dropDatabase(doomedDatabase->myDatabaseID);
1578
 
                
1579
 
                doomedDatabasePath = doomedDatabase->myDatabasePath;
1580
 
                doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
1581
 
                
1582
 
                MSTableList::removeDatabaseTables(RETAIN(doomedDatabase));
1583
 
                MSSystemTableShare::removeDatabaseSystemTables(RETAIN(doomedDatabase));
1584
 
 
1585
 
                doomedDatabase->dropDatabase(); // Shutdown database threads.
1586
 
                
1587
 
                // To avoid a deadlock a lock is not taken on the database list
1588
 
                // if shutdown is in progress. The only database that would be
1589
 
                // dropped during a shutdown is an incomplete backup database.
1590
 
                ASSERT(doomedDatabase->isBackup || !self->myMustQuit);
1591
 
                if (!self->myMustQuit) 
1592
 
                        lock_(gDatabaseList); // Be sure to shutdown the database before locking this or it can lead to deadlocks
1593
 
                        
1594
 
                gDatabaseArray->remove(doomedDatabase->myDatabaseID);
1595
 
                if (!doomedDatabase->isBackup)
1596
 
                        gDatabaseList->remove(doomedDatabase->getKey());
1597
 
                if (!self->myMustQuit) 
1598
 
                        unlock_(gDatabaseList); 
1599
 
                ASSERT(doomedDatabase->getRefCount() == 1);
1600
 
                release_(doomedDatabase);
1601
 
                
1602
 
        } else {
1603
 
                CSPath *path;
1604
 
                bool create = false;
1605
 
                uint32_t db_id;
1606
 
                
1607
 
                path = createDatabasePath(ms_my_get_mysql_home_path(), CSString::newString(db_name), &db_id, &create);
1608
 
                
1609
 
                if (path) {
1610
 
                        MSTransactionManager::dropDatabase(db_id);
1611
 
 
1612
 
                        push_(path);
1613
 
                        doomedDatabasePath = path->getString();
1614
 
                        doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
1615
 
                        release_(path);
1616
 
                }
1617
 
        }
1618
 
        
1619
 
        if (doomedDatabasePath)
1620
 
                removeDatabasePath(doomedDatabasePath);
1621
 
        
1622
 
        exit_();
1623
 
}
1624
 
 
1625
 
void MSDatabase::dropDatabase(const char *db_name ) 
1626
 
{
1627
 
        enter_();
1628
 
        dropDatabase(getDatabase(db_name, false), db_name);
1629
 
        exit_();
1630
 
}
1631
 
 
1632
 
// The table_path can be several things here:
1633
 
// 1: <absalute path>/<database>/<table>
1634
 
// 2: <absalute path>/<database>
1635
 
// 3: <database>/<table>
1636
 
bool MSDatabase::convertTablePathToIDs(const char *table_path, uint32_t *db_id, uint32_t *tab_id, bool create) 
1637
 
{
1638
 
        const char      *base = ms_my_get_mysql_home_path();
1639
 
        CSString        *table_url;
1640
 
        CSString        *db_path = NULL;
1641
 
        CSString        *db_name = NULL;
1642
 
        CSString        *tab_name = NULL;
1643
 
        MSDatabase      *db;
1644
 
        enter_();
1645
 
        
1646
 
        *db_id = 0;
1647
 
        *tab_id = 0;
1648
 
        
1649
 
        table_url = CSString::newString(table_path);
1650
 
        if (table_url->startsWith(base)) {
1651
 
                table_url = table_url->right(base);
1652
 
        }
1653
 
        push_(table_url);
1654
 
 
1655
 
 
1656
 
        db_path = table_url->left("/", -1);
1657
 
        push_(db_path);
1658
 
        tab_name = table_url->right("/", -1);
1659
 
        
1660
 
        pop_(db_path);
1661
 
        release_(table_url);
1662
 
 
1663
 
        if (db_path->length() == 0) { // Only a database name was supplied.
1664
 
                db_path->release();
1665
 
                db_name = tab_name;
1666
 
                tab_name = NULL;
1667
 
        } else {
1668
 
                if (tab_name->length() == 0) {
1669
 
                        tab_name->release();
1670
 
                        tab_name = NULL;
1671
 
                } else
1672
 
                        push_(tab_name);
1673
 
                push_(db_path);
1674
 
                db_name = db_path->right("/", -1);
1675
 
                pop_(db_path);
1676
 
                if (db_name->length() == 0) {
1677
 
                        db_name->release();
1678
 
                        db_name = db_path;
1679
 
                } else {
1680
 
                        db_path->release();
1681
 
                        db_path = NULL;
1682
 
                }
1683
 
        }
1684
 
        
1685
 
        db = MSDatabase::getDatabase(db_name, create); // This will release db_name
1686
 
        if (db) {
1687
 
                *db_id = db->myDatabaseID;
1688
 
                if (tab_name) {
1689
 
                        MSTable         *tab;
1690
 
                        pop_(tab_name);
1691
 
                        push_(db);
1692
 
                        tab = db->getTable(tab_name, create);// This will release tab_name
1693
 
                        pop_(db);
1694
 
                        if (tab) {
1695
 
                                *tab_id = tab->myTableID;
1696
 
                                tab->release();
1697
 
                        }
1698
 
                }
1699
 
                        
1700
 
                db->release();
1701
 
        }
1702
 
        
1703
 
        return_((*tab_id > 0) && (*db_id > 0));
1704
 
}
1705
 
 
1706
 
bool MSDatabase::convertTableAndDatabaseToIDs(const char *db_name, const char *tab_name, uint32_t *db_id, uint32_t *tab_id, bool create) 
1707
 
{
1708
 
        MSDatabase      *db;
1709
 
        enter_();
1710
 
        
1711
 
        *db_id = 0;
1712
 
        *tab_id = 0;
1713
 
        
1714
 
        db = MSDatabase::getDatabase(db_name, create); 
1715
 
        if (db) {
1716
 
                push_(db);
1717
 
                *db_id = db->myDatabaseID;
1718
 
                if (tab_name) {
1719
 
                        MSTable         *tab;
1720
 
                        tab = db->getTable(tab_name, create);
1721
 
                        if (tab) {
1722
 
                                *tab_id = tab->myTableID;
1723
 
                                tab->release();
1724
 
                        }
1725
 
                }
1726
 
                        
1727
 
                release_(db);
1728
 
        }
1729
 
        
1730
 
        return_((*tab_id > 0) && (*db_id > 0));
1731
 
}
1732
 
 
1733
 
MSDatabase *MSDatabase::loadDatabase(CSString *db_name, bool create)
1734
 
{
1735
 
        MSDatabase *db;
1736
 
        enter_();
1737
 
        
1738
 
        db = newDatabase(ms_my_get_mysql_home_path(), db_name, 0, create);      
1739
 
        if (db) {
1740
 
                push_(db);
1741
 
                
1742
 
                gDatabaseList->add(RETAIN(db));
1743
 
                
1744
 
                gDatabaseArray->set(db->myDatabaseID, RETAIN(db));
1745
 
                db->startThreads();
1746
 
                PBMSSystemTables::loadSystemTables(RETAIN(db));
1747
 
                        
1748
 
                pop_(db);
1749
 
        }
1750
 
        return_(db);
1751
 
}
1752
 
 
1753
 
uint32_t MSDatabase::getDatabaseID(CSString *db_name, bool create)
1754
 
{
1755
 
        MSDatabase *db;
1756
 
        uint32_t id = 0;
1757
 
        enter_();
1758
 
        push_(db_name);
1759
 
        
1760
 
        
1761
 
        lock_(gDatabaseList);
1762
 
        if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
1763
 
                db = MSDatabase::loadDatabase(RETAIN(db_name), create);
1764
 
                if (!db)
1765
 
                        goto exit;
1766
 
                id = db->myDatabaseID;
1767
 
                db->release();
1768
 
        } else
1769
 
                id = db->myDatabaseID;
1770
 
        
1771
 
        exit:
1772
 
        unlock_(gDatabaseList);
1773
 
        release_(db_name);
1774
 
        return_(id);
1775
 
}
1776
 
 
1777
 
 
1778
 
uint32_t MSDatabase::getDatabaseID(const char *db_name, bool create)
1779
 
{
1780
 
        return getDatabaseID(CSString::newString(db_name), create);
1781
 
}
1782
 
 
1783
 
MSDatabase *MSDatabase::getBackupDatabase(CSString *db_location, CSString *db_name, uint32_t db_id, bool create)
1784
 
{
1785
 
        bool was_created = create; 
1786
 
        CSPath *path;
1787
 
        MSDatabase *db;
1788
 
        enter_();
1789
 
        
1790
 
        push_(db_location);
1791
 
        push_(db_name);
1792
 
        // If the db already exists and 'create' == true then the existing db
1793
 
        // must be deleted.
1794
 
        // Create the database path, if 'create' == false then it can return NULL
1795
 
        path = createDatabasePath(db_location->getCString(), RETAIN(db_name), &db_id, &was_created);
1796
 
        if (!path) {
1797
 
                CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
1798
 
        }
1799
 
        push_(path);
1800
 
        
1801
 
        // If we wanted to create it but it already exists then throw an error.
1802
 
        if ( create && !was_created) {
1803
 
                char str[120];
1804
 
                snprintf(str, 120, "Duplicate database: %s", db_name->getCString());
1805
 
                CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, str);
1806
 
        }
1807
 
                
1808
 
        release_(path);
1809
 
        pop_(db_name);
1810
 
        // everything looks OK
1811
 
        db = newDatabase(db_location->getCString(), db_name, db_id, create);
1812
 
        db->setBackupDatabase();
1813
 
        release_(db_location);
1814
 
        return_(db);
1815
 
}
1816