~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/alias_ms.cc

  • Committer: Mark Atwood
  • Date: 2011-12-20 02:32:53 UTC
  • mfrom: (2469.1.1 drizzle-build)
  • Revision ID: me@mark.atwood.name-20111220023253-bvu0kr14kwsdvz7g
mergeĀ lp:~brianaker/drizzle/deprecate-pbms

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Barry Leslie
20
 
 *
21
 
 * 2008-12-30
22
 
 *
23
 
 * H&G2JCtL
24
 
 *
25
 
 * BLOB alias index.
26
 
 *
27
 
 */
28
 
 
29
 
#ifdef HAVE_ALIAS_SUPPORT
30
 
#include "cslib/CSConfig.h"
31
 
 
32
 
#include "string.h"
33
 
 
34
 
#ifdef DRIZZLED
35
 
#include <drizzled/common.h>
36
 
#endif
37
 
 
38
 
#include "cslib/CSGlobal.h"
39
 
#include "cslib/CSLog.h"
40
 
#include "cslib/CSStrUtil.h"
41
 
#include "cslib/CSFile.h"
42
 
#include "system_table_ms.h"
43
 
#include "database_ms.h"
44
 
 
45
 
#include "alias_ms.h"
46
 
 
47
 
 
48
 
 
49
 
//------------------------
50
 
MSAlias::~MSAlias()
51
 
{
52
 
        enter_();
53
 
        
54
 
        ASSERT(iClosing);
55
 
        ASSERT(iPoolSysTables.getSize() == 0);
56
 
 
57
 
        
58
 
        if (iFilePath) {
59
 
        
60
 
                if (iDelete)
61
 
                        iFilePath->removeFile();
62
 
                        
63
 
                iFilePath->release();
64
 
        }
65
 
        
66
 
        if (iFileShare)
67
 
                iFileShare->release();
68
 
                
69
 
        exit_();
70
 
}
71
 
 
72
 
//------------------------
73
 
MSAlias::MSAlias(MSDatabase *db_noref)
74
 
{
75
 
        iClosing = false;
76
 
        iDelete = false;
77
 
        iDatabase_br = db_noref;
78
 
        iFilePath = NULL;
79
 
        iFileShare = NULL;
80
 
}
81
 
 
82
 
//------------------------
83
 
void MSAlias::ma_close()
84
 
{
85
 
        enter_();
86
 
        
87
 
        iClosing = true;
88
 
        if (iFileShare)
89
 
                iFileShare->close();
90
 
        iPoolSysTables.clear();
91
 
        exit_();
92
 
}
93
 
 
94
 
//------------------------
95
 
// Compress the index bucket chain and free unused buckets.
96
 
void MSAlias::MSAliasCompress(CSFile *fa, CSSortedList  *freeList, MSABucketLinkedList *bucketChain)
97
 
{
98
 
        // For now I will just remove empty buckets. 
99
 
        // Later this function should also compress the records also 
100
 
        // thus making the searches faster and freeing up more space.
101
 
        MSABucketInfo *b_info, *next;
102
 
        
103
 
        b_info = bucketChain->getFront();
104
 
        while (b_info) {
105
 
                next = b_info->getNextLink();
106
 
                if (b_info->getSize() == 0) {
107
 
                        bucketChain->remove(RETAIN(b_info));
108
 
                        freeList->add(b_info);
109
 
                }               
110
 
                b_info = next;
111
 
        }
112
 
                
113
 
}
114
 
 
115
 
//------------------------
116
 
void MSAlias::MSAliasLoad()
117
 
{       
118
 
        CSFile                  *fa = NULL;
119
 
        CSSortedList    freeList;
120
 
        off64_t                 fileSize;
121
 
 
122
 
        enter_();
123
 
        
124
 
        fa = CSFile::newFile(RETAIN(iFilePath));
125
 
        push_(fa);
126
 
 
127
 
        MSAliasHeadRec header;
128
 
        uint64_t free_list_offset;
129
 
        fa->open(CSFile::DEFAULT);
130
 
        fa->read(&header, 0, sizeof(header), sizeof(header));
131
 
        
132
 
        /* Check the file header: */
133
 
        if (CS_GET_DISK_4(header.ah_magic_4) != MS_ALIAS_FILE_MAGIC)
134
 
                CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_HEADER_MAGIC);
135
 
        if (CS_GET_DISK_2(header.ah_version_2) != MS_ALIAS_FILE_VERSION)
136
 
                CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_VERSION_TOO_NEW);
137
 
                
138
 
        free_list_offset = CS_GET_DISK_8(header.ah_free_list_8);
139
 
        
140
 
        fileSize = CS_GET_DISK_8(header.ah_file_size_8);
141
 
 
142
 
        // Do some sanity checks. 
143
 
        if (CS_GET_DISK_2(header.ah_head_size_2) != sizeof(header))
144
 
                CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
145
 
        
146
 
        if (CS_GET_DISK_2(header.ah_num_buckets_2) != BUCKET_LIST_SIZE)
147
 
                CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
148
 
        
149
 
        if (CS_GET_DISK_4(header.ah_bucket_size_4) != NUM_RECORDS_PER_BUCKET)
150
 
                CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
151
 
        
152
 
        if (fileSize != fa->getEOF()) 
153
 
                CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
154
 
        
155
 
        // Load the bucket headers into RAM
156
 
        MSADiskBucketHeadRec bucketHead = {0};
157
 
        uint64_t offset, start_offset;
158
 
 
159
 
        // Fist load the free list:
160
 
        if (free_list_offset) {
161
 
                start_offset = offset = free_list_offset;
162
 
                do {
163
 
                        fa->read(&bucketHead, offset, sizeof(MSADiskBucketHeadRec), sizeof(MSADiskBucketHeadRec));
164
 
                        freeList.add(MSABucketInfo::newMSABucketInfo(offset));
165
 
                        offset = CS_GET_DISK_8(bucketHead.ab_next_bucket_8);                                    
166
 
                } while (offset != start_offset);
167
 
                
168
 
        }
169
 
        for (uint32_t i = 0; i < BUCKET_LIST_SIZE; i++) {
170
 
                uint64_t used, total_space;
171
 
                MSABucketLinkedList *bucketChain = &(iFileShare->msa_buckets[i]);
172
 
                
173
 
                start_offset = offset = sizeof(header) + i * sizeof(MSADiskBucketRec);
174
 
                used = total_space = 0;
175
 
                do {
176
 
                        uint32_t num, end_of_records;
177
 
                        
178
 
                        fa->read(&bucketHead, offset, sizeof(MSADiskBucketHeadRec), sizeof(MSADiskBucketHeadRec));
179
 
                        num = CS_GET_DISK_4(bucketHead.ab_num_recs_4);
180
 
                        end_of_records = CS_GET_DISK_4(bucketHead.ab_eor_rec_4);
181
 
                        total_space += NUM_RECORDS_PER_BUCKET;
182
 
                        used += num;
183
 
                        bucketChain->addFront(MSABucketInfo::newMSABucketInfo(offset, num, end_of_records));
184
 
                        offset = CS_GET_DISK_8(bucketHead.ab_next_bucket_8);
185
 
                        
186
 
                } while (offset != start_offset);
187
 
                
188
 
                // Pack the index if required
189
 
                if (((total_space - used) /  NUM_RECORDS_PER_BUCKET) > 1) 
190
 
                        MSAliasCompress(fa, &freeList, bucketChain); 
191
 
                        
192
 
        }
193
 
        
194
 
        // If there are free buckets try to free up some disk
195
 
        // space or add them to a free list to be reused later.
196
 
        if (freeList.getSize()) {
197
 
                uint64_t last_bucket = fileSize - sizeof(MSADiskBucketRec);
198
 
                MSABucketInfo *rec;
199
 
                bool reduce = false;
200
 
                
201
 
                // Search for freed buckets at the end of the file
202
 
                // so that they can be released and the file
203
 
                // shrunk.
204
 
                //
205
 
                // The free list has been sorted so that buckets
206
 
                // with the highest file offset are first.
207
 
                do {
208
 
                        rec = (MSABucketInfo*) freeList.itemAt(0);
209
 
                        if (rec->bi_bucket_offset != last_bucket);
210
 
                                break;
211
 
                                
212
 
                        last_bucket -= sizeof(MSADiskBucketRec);
213
 
                        freeList.remove(rec);
214
 
                        reduce = true;
215
 
                } while (freeList.getSize());
216
 
                
217
 
                if (reduce) {
218
 
                        // The file can be reduced in size.
219
 
                        fileSize = last_bucket + sizeof(MSADiskBucketRec);      
220
 
                        fa->setEOF(fileSize);
221
 
                        CS_SET_DISK_8(header.ah_file_size_8, fileSize);
222
 
                        fa->write(&header.ah_file_size_8, offsetof(MSAliasHeadRec,ah_file_size_8) , 8); 
223
 
                }
224
 
                
225
 
                // Add the empty buckets to the index file's empty bucket list.
226
 
                memset(&bucketHead, 0, sizeof(bucketHead));
227
 
                offset = 0;
228
 
                while (freeList.getSize()) { // Add the empty buckets to the empty_bucket list.
229
 
                        rec = (MSABucketInfo*) freeList.takeItemAt(0);
230
 
                        
231
 
                        // buckets are added to the front of the list.
232
 
                        fa->write(&offset, rec->bi_bucket_offset + offsetof(MSADiskBucketHeadRec,ab_next_bucket_8) , 8);
233
 
                        offset =  rec->bi_bucket_offset;
234
 
                        fa->write(&offset, offsetof(MSAliasHeadRec,ah_free_list_8) , 8); 
235
 
                        
236
 
                        iFileShare->msa_empty_buckets.addFront(rec);
237
 
                }
238
 
        }
239
 
        
240
 
        iFileShare->msa_fileSize = fa->getEOF();
241
 
        
242
 
        release_(fa);
243
 
        exit_();
244
 
}
245
 
 
246
 
//------------------------
247
 
void MSAlias::buildAliasIndex()
248
 
{
249
 
        MSBlobHeadRec   blob;
250
 
        MSRepository    *repo;
251
 
        uint64_t                        blob_size, fileSize, offset;
252
 
        uint16_t                        head_size;
253
 
        MSAliasFile             *afile;
254
 
        MSAliasRec              aliasRec;
255
 
        
256
 
        enter_();
257
 
        
258
 
        afile = getAliasFile();
259
 
        frompool_(afile);
260
 
 
261
 
        afile->startLoad();
262
 
 
263
 
        CSSyncVector    *repo_list = iDatabase_br->getRepositoryList();
264
 
        
265
 
        // No locking is required since the index is loaded before the database is opened
266
 
        // and the compactor thread is started.
267
 
 
268
 
        for (uint32_t repo_index =0; repo_index<repo_list->size(); repo_index++) {
269
 
                if ((repo = (MSRepository *) repo_list->get(repo_index))) {
270
 
                        MSRepoFile      *repoFile = repo->openRepoFile();
271
 
                        push_(repoFile);
272
 
                        fileSize = repo->getRepoFileSize();
273
 
                        offset = repo->getRepoHeadSize();
274
 
                        
275
 
                        aliasRec.repo_id = repoFile->myRepo->getRepoID();
276
 
                        
277
 
                        while (offset < fileSize) {
278
 
                                if (repoFile->read(&blob, offset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) 
279
 
                                        break;
280
 
                                        
281
 
                                if ((CS_GET_DISK_1(blob.rb_status_1) == MS_BLOB_REFERENCED) && CS_GET_DISK_2(blob.rb_alias_offset_2)) {
282
 
                                        aliasRec.repo_offset = offset;
283
 
                                        aliasRec.alias_hash = CS_GET_DISK_4(blob.rb_alias_hash_4);
284
 
                                        addAlias(afile, &aliasRec);
285
 
                                }
286
 
                                
287
 
                                head_size = CS_GET_DISK_2(blob.rb_head_size_2);
288
 
                                blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
289
 
                                offset += head_size + blob_size;
290
 
                        }
291
 
                        
292
 
                        release_(repoFile);
293
 
                }
294
 
        }
295
 
        
296
 
        afile->finishLoad();
297
 
        backtopool_(afile);
298
 
 
299
 
        exit_();
300
 
}
301
 
 
302
 
//------------------------
303
 
void MSAlias::MSAliasBuild()
304
 
{
305
 
        CSFile *fa;
306
 
        MSAliasHeadRec header = {0};
307
 
        uint64_t offset, size = sizeof(header) + BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec);
308
 
        enter_();
309
 
        
310
 
        fa = CSFile::newFile(RETAIN(iFilePath));
311
 
        push_(fa);
312
 
 
313
 
        fa->open(CSFile::CREATE | CSFile::TRUNCATE);
314
 
 
315
 
        // Create an empty index with 1 empty bucket in each bucket chain.      
316
 
 
317
 
        CS_SET_DISK_4(header.ah_magic_4,  MS_ALIAS_FILE_MAGIC);
318
 
        CS_SET_DISK_2(header.ah_version_2, MS_ALIAS_FILE_VERSION);
319
 
                
320
 
        CS_SET_DISK_2(header.ah_head_size_2, sizeof(header));
321
 
        CS_SET_DISK_8(header.ah_file_size_8, size);
322
 
        
323
 
        CS_SET_DISK_2(header.ah_num_buckets_2, BUCKET_LIST_SIZE);
324
 
        CS_SET_DISK_2(header.ah_bucket_size_4, NUM_RECORDS_PER_BUCKET);
325
 
        
326
 
        fa->setEOF(size); // Grow the file.
327
 
        fa->write(&header, 0, sizeof(header));
328
 
 
329
 
        offset = sizeof(header);
330
 
        
331
 
        // Initialize the file bucket chains.
332
 
        MSADiskBucketHeadRec bucketHead = {0};
333
 
        for (uint32_t i = 0; i < BUCKET_LIST_SIZE; i++) {
334
 
                CS_SET_DISK_8(bucketHead.ab_prev_bucket_8, offset);
335
 
                CS_SET_DISK_8(bucketHead.ab_next_bucket_8, offset);
336
 
                fa->write(&bucketHead, offset, sizeof(MSADiskBucketHeadRec));
337
 
                // Add the bucket to the RAM based list.
338
 
                iFileShare->msa_buckets[i].addFront(MSABucketInfo::newMSABucketInfo(offset));
339
 
                offset += sizeof(MSADiskBucketRec); // NOTE: MSADiskBucketRec not MSADiskBucketHeadRec
340
 
        }
341
 
        
342
 
        fa->sync();
343
 
        
344
 
        
345
 
        
346
 
        fa->close();
347
 
        
348
 
        release_(fa);
349
 
        
350
 
        // Scan through all the BLOBs in the repository and add an entry
351
 
        // for each blob alias.
352
 
        buildAliasIndex();
353
 
 
354
 
        exit_();
355
 
}
356
 
 
357
 
//------------------------
358
 
void MSAlias::ma_open(const char *file_name)
359
 
{
360
 
        bool isdir = false;
361
 
        
362
 
        enter_();
363
 
 
364
 
        iFilePath = CSPath::newPath(RETAIN(iDatabase_br->myDatabasePath), file_name);
365
 
        
366
 
retry:
367
 
        new_(iFileShare, MSAliasFileShare(RETAIN(iFilePath)));
368
 
        
369
 
        if (iFilePath->exists(&isdir)) {
370
 
                try_(a) {
371
 
                        MSAliasLoad(); 
372
 
                }
373
 
                catch_(a) {
374
 
                        // If an error occurs delete the index and rebuild it.
375
 
                        self->myException.log(NULL);
376
 
                        iFileShare->release();
377
 
                        iFilePath->removeFile();
378
 
                        goto retry;
379
 
                }
380
 
                cont_(a);
381
 
        } else
382
 
                MSAliasBuild();
383
 
        
384
 
        
385
 
        exit_();
386
 
}
387
 
 
388
 
//------------------------
389
 
uint32_t MSAlias::hashAlias(const char *ptr)
390
 
{
391
 
        register uint32_t h = 0, g;
392
 
        
393
 
        while (*ptr) {
394
 
                h = (h << 4) + (uint32_t) toupper(*ptr++);
395
 
                if ((g = (h & 0xF0000000)))
396
 
                        h = (h ^ (g >> 24)) ^ g;
397
 
        }
398
 
 
399
 
        return (h);
400
 
}
401
 
 
402
 
//------------------------
403
 
void MSAlias::addAlias(MSAliasFile *af, MSAliasRec *rec)
404
 
{
405
 
        MSDiskAliasRec diskRec;
406
 
        CS_SET_DISK_4(diskRec.ar_repo_id_4, rec->repo_id);      
407
 
        CS_SET_DISK_8(diskRec.ar_offset_8, rec->repo_offset);   
408
 
        CS_SET_DISK_4(diskRec.ar_hash_4, rec->alias_hash);
409
 
        af->addRec(&diskRec);
410
 
 
411
 
}
412
 
 
413
 
//------------------------
414
 
uint32_t MSAlias::addAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
415
 
{
416
 
        MSDiskAliasRec diskRec;
417
 
        uint32_t hash;
418
 
        uint32_t f_repo_id;
419
 
        uint64_t f_repo_offset;
420
 
        bool referenced = false;
421
 
        enter_();
422
 
        
423
 
        hash = hashAlias(alias);
424
 
        
425
 
        // Use a lock to make sure that the same alias cannot be added at the same time.
426
 
        lock_(this);
427
 
        
428
 
        MSAliasFile *af = getAliasFile();
429
 
        frompool_(af);
430
 
 
431
 
        if (findBlobByAlias(RETAIN(af), alias, &referenced, &f_repo_id, &f_repo_offset)) {
432
 
                if ((f_repo_id == repo_id) && (f_repo_offset == repo_offset))
433
 
                        goto done; // Do not treat this as an error.
434
 
                if (!referenced) {
435
 
                        // If the alias is in use by a non referenced BLOB then delete it.
436
 
                        // This can happen because I allow newly created BLOBs to be accessed
437
 
                        // by their alias even before a reference to the BLOB has been added to
438
 
                        // the database.
439
 
                        af->deleteCurrentRec();
440
 
                } else  {
441
 
#ifdef xxDEBUG
442
 
                        CSL.log(self, CSLog::Protocol, "Alias: ");
443
 
                        CSL.log(self, CSLog::Protocol, alias);
444
 
                        CSL.log(self, CSLog::Protocol, "\n");
445
 
#endif
446
 
                        CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Alias Exists");
447
 
                }
448
 
        }
449
 
                
450
 
        CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);   
451
 
        CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);        
452
 
        CS_SET_DISK_4(diskRec.ar_hash_4, hash); 
453
 
 
454
 
        af->addRec(&diskRec);
455
 
done:
456
 
        backtopool_(af);
457
 
 
458
 
        unlock_(this);
459
 
        return_(hash);
460
 
}
461
 
 
462
 
//------------------------
463
 
void MSAlias::deleteAlias(MSDiskAliasPtr diskRec)
464
 
{
465
 
        enter_();
466
 
        
467
 
        MSAliasFile *af = getAliasFile();
468
 
        frompool_(af);
469
 
        if (af->findRec(diskRec))
470
 
                af->deleteCurrentRec();
471
 
        backtopool_(af);
472
 
 
473
 
        exit_();
474
 
}
475
 
 
476
 
//------------------------
477
 
void MSAlias::deleteAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
478
 
{
479
 
        MSDiskAliasRec diskRec;
480
 
        
481
 
        CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);   
482
 
        CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);        
483
 
        CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);   
484
 
        deleteAlias(&diskRec);
485
 
        
486
 
}
487
 
//------------------------
488
 
void MSAlias::resetAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
489
 
{
490
 
        MSDiskAliasRec diskRec;
491
 
        bool found;
492
 
        enter_();
493
 
        
494
 
        CS_SET_DISK_4(diskRec.ar_repo_id_4, old_repo_id);       
495
 
        CS_SET_DISK_8(diskRec.ar_offset_8, old_repo_offset);    
496
 
        CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);   
497
 
 
498
 
        lock_(this);
499
 
        
500
 
        MSAliasFile *af = getAliasFile();
501
 
        frompool_(af);
502
 
        found = af->findRec(&diskRec);
503
 
        CS_SET_DISK_4(diskRec.ar_repo_id_4, new_repo_id);       
504
 
        CS_SET_DISK_8(diskRec.ar_offset_8, new_repo_offset);    
505
 
 
506
 
        if (found) 
507
 
                af->updateCurrentRec(&diskRec);
508
 
        else {
509
 
                CSException::logException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Alias doesn't exists");
510
 
                af->addRec(&diskRec);
511
 
        }
512
 
                        
513
 
        backtopool_(af);
514
 
 
515
 
        unlock_(this);
516
 
        exit_();
517
 
}
518
 
 
519
 
//------------------------
520
 
// Check to see if the blob with the given repo_id
521
 
// and repo_offset has the specified alias.
522
 
bool MSAlias::hasBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias, bool *referenced)
523
 
{
524
 
        bool found = false;
525
 
        MSRepoFile *repoFile;
526
 
        MSBlobHeadRec   blob;
527
 
        uint8_t status;
528
 
        uint64_t offset;
529
 
        uint32_t alias_size = strlen(alias) +1;
530
 
        char blob_alias[BLOB_ALIAS_LENGTH +1];
531
 
        
532
 
        if (alias_size > BLOB_ALIAS_LENGTH)
533
 
                return false;
534
 
 
535
 
        enter_();
536
 
        
537
 
        repoFile = iDatabase_br->getRepoFileFromPool(repo_id, false);
538
 
        frompool_(repoFile);
539
 
 
540
 
        repoFile->read(&blob, repo_offset, sizeof(MSBlobHeadRec), sizeof(MSBlobHeadRec));
541
 
        status = CS_GET_DISK_1(blob.rb_status_1);
542
 
        if (IN_USE_BLOB_STATUS(status)) {
543
 
                offset = repo_offset + CS_GET_DISK_2(blob.rb_alias_offset_2);
544
 
                
545
 
                blob_alias[BLOB_ALIAS_LENGTH] = 0;
546
 
                if (repoFile->read(blob_alias, offset, alias_size, 0) == alias_size) {
547
 
                        found = !my_charset_utf8_general_ci.strcasecmp(blob_alias, alias);
548
 
                        if (found)
549
 
                                *referenced = (status == MS_BLOB_REFERENCED);
550
 
                }
551
 
        } else {
552
 
                CSException::logException(CS_CONTEXT, MS_ERR_ENGINE, "Deleted BLOB alias found. (Rebuild BLOB alias index.)");
553
 
        }
554
 
        
555
 
                
556
 
        backtopool_(repoFile);  
557
 
 
558
 
        return_(found);
559
 
}
560
 
 
561
 
//------------------------
562
 
bool MSAlias::findBlobByAlias( MSAliasFile *af, const char *alias, bool *referenced, uint32_t *repo_id, uint64_t *repo_offset)
563
 
{
564
 
        bool found = false;
565
 
        uint32_t hash, l_repo_id, l_repo_offset;
566
 
        MSDiskAliasPtr diskRec;
567
 
        enter_();
568
 
 
569
 
        push_(af);
570
 
        
571
 
        hash = hashAlias(alias);
572
 
        diskRec = af->findRec(hash);
573
 
        
574
 
        while (diskRec && !found) {
575
 
                l_repo_id = CS_GET_DISK_4(diskRec->ar_repo_id_4);
576
 
                l_repo_offset = CS_GET_DISK_8(diskRec->ar_offset_8);
577
 
                if (hasBlobAlias(l_repo_id, l_repo_offset, alias, referenced))
578
 
                        found = true;
579
 
                else
580
 
                        diskRec = af->nextRec();
581
 
        }
582
 
                
583
 
        if (found) {
584
 
                if (repo_id)
585
 
                        *repo_id = l_repo_id;
586
 
                        
587
 
                if (repo_offset)
588
 
                        *repo_offset = l_repo_offset;
589
 
        }
590
 
        
591
 
        release_(af);
592
 
        return_(found);
593
 
}
594
 
//------------------------
595
 
bool MSAlias::findBlobByAlias( const char *alias, bool *referenced, uint32_t *repo_id, uint64_t *repo_offset)
596
 
{
597
 
        bool found;
598
 
        enter_();
599
 
        
600
 
        MSAliasFile *af = getAliasFile();
601
 
        frompool_(af);
602
 
        
603
 
        found = findBlobByAlias(RETAIN(af), alias, referenced, repo_id, repo_offset);
604
 
 
605
 
        backtopool_(af);
606
 
        return_(found);
607
 
}
608
 
 
609
 
//------------------------
610
 
bool MSAlias::blobAliasExists(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
611
 
{
612
 
        bool found;
613
 
        MSDiskAliasRec diskRec;
614
 
        
615
 
        CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);   
616
 
        CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);        
617
 
        CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);   
618
 
 
619
 
        enter_();
620
 
        
621
 
        MSAliasFile *af = getAliasFile();
622
 
        frompool_(af);
623
 
        
624
 
        found = af->findRec(&diskRec);
625
 
 
626
 
        backtopool_(af);
627
 
        return_(found);
628
 
}
629
 
 
630
 
/////////////////////////////////////
631
 
MSSysMeta::MSSysMeta(MSAlias *msa)
632
 
{
633
 
        md_myMSAlias = msa;
634
 
        md_isFileInUse = false;
635
 
        md_NextLink = md_PrevLink = NULL;
636
 
        
637
 
        mtab = MSMetaDataTable::newMSMetaDataTable(RETAIN(msa->iDatabase_br));
638
 
}
639
 
 
640
 
//------------------------
641
 
MSSysMeta::~MSSysMeta()
642
 
{
643
 
        if (mtab)
644
 
                mtab->release();
645
 
 
646
 
        if (md_myMSAlias)
647
 
                md_myMSAlias->release();
648
 
}
649
 
 
650
 
//------------------------
651
 
void MSSysMeta::returnToPool()
652
 
{
653
 
        enter_();
654
 
        push_(this);
655
 
        
656
 
                
657
 
        md_isFileInUse = false;
658
 
                
659
 
        if (!md_myMSAlias->iClosing) {
660
 
                lock_(&md_myMSAlias->iSysTablePoolLock); // It may be better if the pool had it's own lock.
661
 
                md_nextFile = md_myMSAlias->iSysTablePool;
662
 
                md_myMSAlias->iSysTablePool - this;
663
 
                unlock_(&md_myMSAlias->iSysTablePoolLock);
664
 
        }
665
 
                
666
 
        release_(this);
667
 
        exit_();
668
 
}
669
 
//------------------------
670
 
bool MSSysMeta::matchAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
671
 
{
672
 
        mtab->seqScanInit(); 
673
 
        return mtab->matchAlias(repo_id, repo_offset, alias); 
674
 
}
675
 
 
676
 
/////////////////////////////////////
677
 
/////////////////////////////////////
678
 
MSAliasFile::MSAliasFile(MSAliasFileShare *share)
679
 
{
680
 
        ba_share = share;
681
 
        ba_isFileInUse = false;
682
 
        ba_NextLink = ba_PrevLink = NULL;
683
 
        
684
 
        iCurrentRec = 0;
685
 
        iBucketCache = NULL;
686
 
        iStartBucket = iCurrentBucket = NULL;
687
 
        iBucketChain = NULL;
688
 
        iLoading = false;
689
 
        ba_nextFile = NULL;
690
 
        
691
 
        iFile = CSFile::newFile(RETAIN(ba_share->msa_filePath));        
692
 
        iFile->open(CSFile::DEFAULT);
693
 
        
694
 
        
695
 
}
696
 
 
697
 
//------------------------
698
 
MSAliasFile::~MSAliasFile()
699
 
{
700
 
        if (iFile)
701
 
                iFile->release();
702
 
                
703
 
        if (iBucketCache)
704
 
                cs_free(iBucketCache);
705
 
}
706
 
 
707
 
//------------------------
708
 
void MSAliasFile::startLoad()
709
 
{
710
 
        enter_();
711
 
        
712
 
        ASSERT(!iLoading);
713
 
        
714
 
//      iBucketCache = (MSADiskBucketRec*) cs_malloc(BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec));
715
 
//      memset(iBucketCache, 0, BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec));
716
 
        iLoading = true;
717
 
        
718
 
        exit_();
719
 
}
720
 
 
721
 
//------------------------
722
 
void MSAliasFile::finishLoad()
723
 
{
724
 
        enter_();
725
 
        ASSERT(iLoading);
726
 
        // Write the bucket cache to disk.
727
 
//      for (iCurrentBucket && iCurrentBucket->getSize()) {
728
 
                // To Be implemented.
729
 
//      }
730
 
//      cs_free(iBucketCache);
731
 
        iBucketCache = NULL;
732
 
        iLoading = false;
733
 
        exit_();
734
 
}
735
 
 
736
 
//------------------------
737
 
void MSAliasFile::returnToPool()
738
 
{
739
 
        enter_();
740
 
        push_(this);
741
 
        
742
 
        if (iLoading) {
743
 
                // If iLoading is still set then probably an exception has been thrown.
744
 
                try_(a) {
745
 
                        finishLoad();
746
 
                }
747
 
                catch_(a) 
748
 
                        iLoading = false;
749
 
                cont_(a);
750
 
        }
751
 
 
752
 
        ba_isFileInUse = false;
753
 
                
754
 
        if (!ba_share->msa_closing) {
755
 
                lock_(&ba_share->msa_poolLock);
756
 
                ba_nextFile = ba_share->msa_pool;
757
 
                ba_share->msa_pool = this;
758
 
                unlock_(&ba_share->msa_poolLock);
759
 
        }
760
 
                
761
 
        release_(this);
762
 
        exit_();
763
 
}
764
 
 
765
 
//------------------------
766
 
// The bucket chain is treated as a circular list.
767
 
bool MSAliasFile::nextBucket(bool with_space)
768
 
{
769
 
        bool have_bucket = false;
770
 
        enter_();
771
 
        
772
 
        while (!have_bucket){
773
 
                if (iCurrentBucket) {
774
 
                        iCurrentBucket = iCurrentBucket->getNextLink();
775
 
                        if (!iCurrentBucket)
776
 
                                iCurrentBucket = iBucketChain->getFront();
777
 
                        if (iCurrentBucket == iStartBucket)
778
 
                                break;
779
 
                } else {
780
 
                        iCurrentBucket = iBucketChain->getFront();
781
 
                        iStartBucket = iCurrentBucket;
782
 
                }
783
 
                
784
 
                if ((iCurrentBucket->getSize() && !with_space) || (with_space && (iCurrentBucket->getSize() < NUM_RECORDS_PER_BUCKET))){
785
 
                        // Only read the portion of the bucket containing records.
786
 
                        iCurrentRec = iCurrentBucket->getEndOfRecords(); // The current record is set just beyond the last valid record.
787
 
                        size_t size = iCurrentRec * sizeof(MSDiskAliasRec);             
788
 
                        iFile->read(iBucket, iCurrentBucket->bi_records_offset, size, size);                    
789
 
                        have_bucket = true;
790
 
                }
791
 
        }
792
 
        
793
 
        return_(have_bucket);
794
 
}
795
 
 
796
 
//------------------------
797
 
MSDiskAliasPtr MSAliasFile::nextRec()
798
 
{
799
 
        MSDiskAliasPtr rec = NULL;
800
 
        bool have_rec;
801
 
        enter_();
802
 
        
803
 
        while ((!(have_rec = scanBucket())) && nextBucket(false));
804
 
        
805
 
        if (have_rec) 
806
 
                rec = &(iBucket[iCurrentRec]);
807
 
                
808
 
        return_(rec);
809
 
}
810
 
 
811
 
//------------------------
812
 
// When starting a search:
813
 
// If a bucket is already loaded and it is in the correct bucket chain
814
 
// then search it first. In this case then the search starts at the current
815
 
// bucket in the chain.
816
 
//
817
 
// Searches are from back to front with the idea that the more recently
818
 
// added objects will be seached for more often and they are more likely
819
 
// to be at the end of the chain.
820
 
MSDiskAliasPtr MSAliasFile::findRec(uint32_t hash)
821
 
{
822
 
        MSDiskAliasPtr rec = NULL;
823
 
        MSABucketLinkedList *list = ba_share->getBucketChain(hash);
824
 
        enter_();
825
 
        
826
 
        CS_SET_DISK_4(iDiskHash_4, hash);
827
 
        if (list == iBucketChain) {
828
 
                // The search is performed back to front.
829
 
                iCurrentRec = iCurrentBucket->getEndOfRecords();  // Position the start just beyond the last valid record.
830
 
                iStartBucket = iCurrentBucket;
831
 
                if (scanBucket()) {
832
 
                        rec = &(iBucket[iCurrentRec]);
833
 
                        goto done;
834
 
                }
835
 
        } else {
836
 
                iBucketChain = list;
837
 
                iCurrentBucket = NULL;
838
 
                iStartBucket = NULL;
839
 
        }
840
 
 
841
 
        if (nextBucket(false))
842
 
                rec = nextRec();
843
 
                
844
 
done:
845
 
        return_(rec);
846
 
}
847
 
 
848
 
//------------------------
849
 
bool MSAliasFile::findRec(MSDiskAliasPtr theRec)
850
 
{
851
 
        MSDiskAliasPtr aRec = NULL;
852
 
        bool found = false;
853
 
        enter_();
854
 
        
855
 
        aRec = findRec(CS_GET_DISK_4(theRec->ar_hash_4));
856
 
        while ( aRec && !found) {
857
 
                if (CS_EQ_DISK_4(aRec->ar_repo_id_4, theRec->ar_repo_id_4) && CS_EQ_DISK_8(aRec->ar_offset_8, theRec->ar_offset_8))
858
 
                        found = true;
859
 
                else
860
 
                        aRec = nextRec();
861
 
        }       
862
 
        return_(found);
863
 
}
864
 
 
865
 
//------------------------
866
 
void MSAliasFile::addRec(MSDiskAliasPtr new_rec)
867
 
{
868
 
        MSABucketLinkedList *list = ba_share->getBucketChain(CS_GET_DISK_4(new_rec->ar_hash_4));
869
 
        enter_();
870
 
        lock_(&ba_share->msa_writeLock);
871
 
 
872
 
        if (iBucketChain != list) {
873
 
                iBucketChain = list;
874
 
                iCurrentBucket = NULL;
875
 
                iStartBucket = NULL;
876
 
        } else 
877
 
                iStartBucket = iCurrentBucket;
878
 
 
879
 
        if ((iCurrentBucket && (iCurrentBucket->getSize() < NUM_RECORDS_PER_BUCKET)) || nextBucket(true)) { // Find a bucket with space in it for a record.
880
 
                uint32_t size = iCurrentBucket->getSize();
881
 
                uint32_t end_of_records = iCurrentBucket->getEndOfRecords();
882
 
 
883
 
                if (size == end_of_records) { // No holes in the recored list
884
 
                        iCurrentRec = end_of_records;                   
885
 
                } else { // Search for the empty record
886
 
                        iCurrentRec = end_of_records -2;                        
887
 
                        while (iCurrentRec && !CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4))
888
 
                                iCurrentRec--;
889
 
                                
890
 
                        ASSERT(CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4));
891
 
                }
892
 
                
893
 
                memcpy(&iBucket[iCurrentRec], new_rec, sizeof(MSDiskAliasRec)); // Add the record to the cached bucket.
894
 
                
895
 
                iCurrentBucket->recAdded(iFile, iCurrentRec); // update the current bucket header.
896
 
        } else { // A new bucket must be added to the chain.
897
 
                MSADiskBucketHeadRec new_bucket = {0};
898
 
                CSDiskValue8 disk_8_value;
899
 
                uint64_t new_bucket_offset;
900
 
                MSABucketInfo *next, *prev;
901
 
                
902
 
                next = iBucketChain->getFront();
903
 
                prev = iBucketChain->getBack();
904
 
                
905
 
                // Set the next and prev bucket offsets in the new bucket record.
906
 
                CS_SET_DISK_8(new_bucket.ab_prev_bucket_8, prev->bi_bucket_offset);
907
 
                CS_SET_DISK_8(new_bucket.ab_next_bucket_8, next->bi_bucket_offset);
908
 
                
909
 
                if (ba_share->msa_empty_buckets.getSize()) { // Get a bucket from the empty bucket list.
910
 
                        MSABucketInfo *empty_bucket = ba_share->msa_empty_buckets.removeFront();
911
 
                        
912
 
                        new_bucket_offset = empty_bucket->bi_bucket_offset;                     
913
 
                        empty_bucket->release();
914
 
                        
915
 
                        // Update the index file's empty bucket list 
916
 
                        if (ba_share->msa_empty_buckets.getSize() == 0) 
917
 
                                CS_SET_NULL_DISK_8(disk_8_value);
918
 
                        else
919
 
                                CS_SET_DISK_8(disk_8_value, iBucketChain->getFront()->bi_bucket_offset);
920
 
                        
921
 
                        iFile->write(&disk_8_value, offsetof(MSAliasHeadRec,ah_free_list_8) , 8); 
922
 
                } else // There are no empty buckets so grow the file.
923
 
                        new_bucket_offset = ba_share->msa_fileSize;
924
 
                        
925
 
                // Write the new bucket's record header to the file
926
 
                iFile->write(&new_bucket, new_bucket_offset, sizeof(MSADiskBucketHeadRec)); 
927
 
                
928
 
                // Insert the new bucket into the bucket chain on the disk.
929
 
                CS_SET_DISK_8(disk_8_value, new_bucket_offset);
930
 
                iFile->write(&disk_8_value, prev->bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_next_bucket_8), 8); 
931
 
                iFile->write(&disk_8_value, next->bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_prev_bucket_8), 8); 
932
 
                
933
 
                // Update the file size in the file header if required
934
 
                if (ba_share->msa_fileSize == new_bucket_offset) {
935
 
                        ba_share->msa_fileSize += sizeof(MSADiskBucketRec); // Note this is MSADiskBucketRec not MSADiskBucketHeadRec
936
 
 
937
 
                        CS_SET_DISK_8(disk_8_value, ba_share->msa_fileSize);
938
 
                        iFile->write(&disk_8_value, offsetof(MSAliasHeadRec,ah_file_size_8) , 8); 
939
 
                }
940
 
                
941
 
                // Add the info rec into the bucket chain in RAM.
942
 
                iCurrentBucket = MSABucketInfo::newMSABucketInfo(new_bucket_offset, 1, 0);
943
 
                iBucketChain->addFront(iCurrentBucket);
944
 
                iCurrentRec = 0;
945
 
        }
946
 
        
947
 
        uint64_t offset;
948
 
        offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
949
 
 
950
 
        // Write the new index entry to the index file.
951
 
        iFile->write(new_rec, offset, sizeof(MSDiskAliasRec)); 
952
 
        
953
 
        unlock_(&ba_share->msa_writeLock);
954
 
        
955
 
        exit_();        
956
 
}
957
 
//------------------------
958
 
void MSAliasFile::deleteCurrentRec()
959
 
{
960
 
        MSDiskAliasPtr rec = &(iBucket[iCurrentRec]);
961
 
        uint64_t        offset;
962
 
        enter_();
963
 
        
964
 
        CS_SET_NULL_DISK_4(rec->ar_repo_id_4);
965
 
        offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
966
 
        
967
 
        lock_(&ba_share->msa_writeLock);
968
 
 
969
 
        // Update the index file. It is assumed that repo_id is the first 4 bytes of 'rec'.
970
 
        iFile->write(rec, offset, 4); 
971
 
        
972
 
        iCurrentBucket->recRemoved(iFile, iCurrentRec, iBucket);
973
 
        
974
 
        unlock_(&ba_share->msa_writeLock);
975
 
        
976
 
        exit_();        
977
 
}
978
 
 
979
 
//------------------------
980
 
void MSAliasFile::updateCurrentRec(MSDiskAliasPtr update_rec)
981
 
{
982
 
        uint64_t        offset;
983
 
        enter_();
984
 
        
985
 
        // ASSERT that the updated rec still belongs to this bucket chain.
986
 
        ASSERT(ba_share->getBucketChain(CS_GET_DISK_4(update_rec->ar_hash_4)) == iBucketChain);
987
 
        ASSERT(!CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4)); // We should not be updating a deleted record.
988
 
        
989
 
        lock_(&ba_share->msa_writeLock);
990
 
        offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
991
 
        
992
 
        // Update the record on disk.
993
 
        iFile->write(update_rec, offset, sizeof(MSDiskAliasRec));
994
 
        
995
 
        // Update the record in memory. 
996
 
        CS_COPY_DISK_4(iBucket[iCurrentRec].ar_repo_id_4, update_rec->ar_repo_id_4);    
997
 
        CS_COPY_DISK_8(iBucket[iCurrentRec].ar_offset_8, update_rec->ar_offset_8);      
998
 
 
999
 
        unlock_(&ba_share->msa_writeLock);
1000
 
        exit_();        
1001
 
}
1002
 
 
1003
 
 
1004
 
//------------------------
1005
 
MSABucketInfo *MSABucketInfo::newMSABucketInfo(uint64_t offset, uint32_t num, uint32_t last)
1006
 
{
1007
 
        MSABucketInfo *bucket;
1008
 
        new_(bucket, MSABucketInfo(offset, num, last));
1009
 
        return bucket;
1010
 
}
1011
 
//------------------------
1012
 
void MSABucketInfo::recRemoved(CSFile *iFile, uint32_t idx, MSDiskAliasRec bucket[])
1013
 
{
1014
 
        MSADiskBucketHeadRec head;
1015
 
        enter_();
1016
 
        
1017
 
        ASSERT(idx < bi_end_of_records);
1018
 
        
1019
 
        bi_num_recs--;
1020
 
        if (!bi_num_recs) {
1021
 
                // It would be nice to remove this bucket from the 
1022
 
                // bucket list and place it on the empty list.
1023
 
                // Before this can be done a locking method would
1024
 
                // be needed to block anyone from reading this
1025
 
                // bucket while it was being moved.
1026
 
                //
1027
 
                // I haven't done this because I have been trying
1028
 
                // to avoid read locks.
1029
 
                bi_end_of_records = 0;
1030
 
        } else if ((bi_end_of_records -1) == idx) {
1031
 
                while (idx && CS_IS_NULL_DISK_4(bucket[idx].ar_repo_id_4))
1032
 
                        idx--;
1033
 
                        
1034
 
                if ((idx ==0) && CS_IS_NULL_DISK_4(bucket[0].ar_repo_id_4))
1035
 
                        bi_end_of_records = 0;
1036
 
                else
1037
 
                        bi_end_of_records = idx +1;
1038
 
                
1039
 
                ASSERT(bi_end_of_records >= bi_num_recs);
1040
 
        }
1041
 
        
1042
 
        // Update the index file.
1043
 
        CS_SET_DISK_4(head.ab_num_recs_4, bi_num_recs);
1044
 
        CS_SET_DISK_4(head.ab_eor_rec_4, bi_end_of_records);
1045
 
        iFile->write(&head.ab_num_recs_4, bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_num_recs_4), 8); 
1046
 
        exit_();
1047
 
}
1048
 
 
1049
 
//------------------------
1050
 
void MSABucketInfo::recAdded(CSFile *iFile, uint32_t idx)
1051
 
{
1052
 
        MSADiskBucketHeadRec head;
1053
 
        enter_();
1054
 
        
1055
 
        ASSERT(bi_num_recs < NUM_RECORDS_PER_BUCKET);
1056
 
        ASSERT(idx < NUM_RECORDS_PER_BUCKET);
1057
 
 
1058
 
        bi_num_recs++;
1059
 
        if (idx == bi_end_of_records)
1060
 
                bi_end_of_records++;
1061
 
                
1062
 
        // Update the index file.
1063
 
        CS_SET_DISK_4(head.ab_num_recs_4, bi_num_recs);
1064
 
        CS_SET_DISK_4(head.ab_eor_rec_4, bi_end_of_records);
1065
 
        iFile->write(&head.ab_num_recs_4, bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_num_recs_4), 8); 
1066
 
        exit_();        
1067
 
}
1068
 
 
1069
 
//////////////////////////////////
1070
 
MSAliasFile *MSAliasFileShare::getPoolFile()
1071
 
{
1072
 
        MSAliasFile *af;
1073
 
        enter_();
1074
 
        
1075
 
        lock_(&msa_poolLock); 
1076
 
        if ((af = msa_pool)) {
1077
 
                msa_pool = af->ba_nextFile;
1078
 
        } else {
1079
 
                new_(af, MSAliasFile(this));
1080
 
                msa_poolFiles.addFront(af);
1081
 
        }
1082
 
        unlock_(&msa_poolLock);
1083
 
        
1084
 
        af->ba_nextFile = NULL;
1085
 
        ASSERT(!af->ba_isFileInUse);
1086
 
        af->ba_isFileInUse = true;
1087
 
        af->retain();
1088
 
        
1089
 
        return_(af);
1090
 
}
1091
 
#endif // HAVE_ALIAS_SUPPORT
1092