~drizzle-trunk/drizzle/development

1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
2
 *
3
 * PrimeBase Media Stream for MySQL
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License
16
 * along with this program; if not, write to the Free Software
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
 *
19
 * Barry Leslie
20
 *
21
 * 2008-12-30
22
 *
23
 * H&G2JCtL
24
 *
25
 * BLOB alias index.
26
 *
27
 */
28
29
#ifdef HAVE_ALIAS_SUPPORT
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
30
#include "cslib/CSConfig.h"
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
31
32
#include "string.h"
33
34
#ifdef DRIZZLED
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
35
#include <drizzled/common.h>
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
36
#endif
37
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
38
#include "cslib/CSGlobal.h"
39
#include "cslib/CSLog.h"
40
#include "cslib/CSStrUtil.h"
41
#include "cslib/CSFile.h"
1548.2.10 by Barry.Leslie at PrimeBase
Merge from trunk.
42
#include "system_table_ms.h"
43
#include "database_ms.h"
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
44
45
#include "alias_ms.h"
46
47
48
49
//------------------------
50
MSAlias::~MSAlias()
51
{
52
	enter_();
53
	
54
	ASSERT(iClosing);
55
	ASSERT(iPoolSysTables.getSize() == 0);
56
57
	
58
	if (iFilePath) {
59
	
60
		if (iDelete)
61
			iFilePath->removeFile();
62
			
63
		iFilePath->release();
64
	}
65
	
66
	if (iFileShare)
67
		iFileShare->release();
68
		
69
	exit_();
70
}
71
72
//------------------------
73
MSAlias::MSAlias(MSDatabase *db_noref)
74
{
75
	iClosing = false;
76
	iDelete = false;
77
	iDatabase_br = db_noref;
78
	iFilePath = NULL;
79
	iFileShare = NULL;
80
}
81
82
//------------------------
83
void MSAlias::ma_close()
84
{
85
	enter_();
86
	
87
	iClosing = true;
88
	if (iFileShare)
89
		iFileShare->close();
90
	iPoolSysTables.clear();
91
	exit_();
92
}
93
94
//------------------------
95
// Compress the index bucket chain and free unused buckets.
96
void MSAlias::MSAliasCompress(CSFile *fa, CSSortedList	*freeList, MSABucketLinkedList *bucketChain)
97
{
98
	// For now I will just remove empty buckets. 
99
	// Later this function should also compress the records also 
100
	// thus making the searches faster and freeing up more space.
101
	MSABucketInfo *b_info, *next;
102
	
103
	b_info = bucketChain->getFront();
104
	while (b_info) {
105
		next = b_info->getNextLink();
106
		if (b_info->getSize() == 0) {
107
			bucketChain->remove(RETAIN(b_info));
108
			freeList->add(b_info);
109
		}		
110
		b_info = next;
111
	}
112
		
113
}
114
115
//------------------------
116
void MSAlias::MSAliasLoad()
117
{	
118
	CSFile			*fa = NULL;
119
	CSSortedList	freeList;
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
120
	off64_t			fileSize;
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
121
122
	enter_();
123
	
124
	fa = CSFile::newFile(RETAIN(iFilePath));
125
	push_(fa);
126
127
	MSAliasHeadRec header;
128
	uint64_t free_list_offset;
129
	fa->open(CSFile::DEFAULT);
130
	fa->read(&header, 0, sizeof(header), sizeof(header));
131
	
132
	/* Check the file header: */
133
	if (CS_GET_DISK_4(header.ah_magic_4) != MS_ALIAS_FILE_MAGIC)
134
		CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_HEADER_MAGIC);
135
	if (CS_GET_DISK_2(header.ah_version_2) != MS_ALIAS_FILE_VERSION)
136
		CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_VERSION_TOO_NEW);
137
		
138
	free_list_offset = CS_GET_DISK_8(header.ah_free_list_8);
139
	
140
	fileSize = CS_GET_DISK_8(header.ah_file_size_8);
141
142
	// Do some sanity checks. 
143
	if (CS_GET_DISK_2(header.ah_head_size_2) != sizeof(header))
144
		CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
145
	
146
	if (CS_GET_DISK_2(header.ah_num_buckets_2) != BUCKET_LIST_SIZE)
147
		CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
148
	
149
	if (CS_GET_DISK_4(header.ah_bucket_size_4) != NUM_RECORDS_PER_BUCKET)
150
		CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
151
	
152
	if (fileSize != fa->getEOF()) 
153
		CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
154
	
155
	// Load the bucket headers into RAM
156
	MSADiskBucketHeadRec bucketHead = {0};
157
	uint64_t offset, start_offset;
158
159
	// Fist load the free list:
160
	if (free_list_offset) {
161
		start_offset = offset = free_list_offset;
162
		do {
163
			fa->read(&bucketHead, offset, sizeof(MSADiskBucketHeadRec), sizeof(MSADiskBucketHeadRec));
164
			freeList.add(MSABucketInfo::newMSABucketInfo(offset));
165
			offset = CS_GET_DISK_8(bucketHead.ab_next_bucket_8);					
166
		} while (offset != start_offset);
167
		
168
	}
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
169
	for (uint32_t i = 0; i < BUCKET_LIST_SIZE; i++) {
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
170
		uint64_t used, total_space;
171
		MSABucketLinkedList *bucketChain = &(iFileShare->msa_buckets[i]);
172
		
173
		start_offset = offset = sizeof(header) + i * sizeof(MSADiskBucketRec);
174
		used = total_space = 0;
175
		do {
176
			uint32_t num, end_of_records;
177
			
178
			fa->read(&bucketHead, offset, sizeof(MSADiskBucketHeadRec), sizeof(MSADiskBucketHeadRec));
179
			num = CS_GET_DISK_4(bucketHead.ab_num_recs_4);
180
			end_of_records = CS_GET_DISK_4(bucketHead.ab_eor_rec_4);
181
			total_space += NUM_RECORDS_PER_BUCKET;
182
			used += num;
183
			bucketChain->addFront(MSABucketInfo::newMSABucketInfo(offset, num, end_of_records));
184
			offset = CS_GET_DISK_8(bucketHead.ab_next_bucket_8);
185
			
186
		} while (offset != start_offset);
187
		
188
		// Pack the index if required
189
		if (((total_space - used) /  NUM_RECORDS_PER_BUCKET) > 1) 
190
			MSAliasCompress(fa, &freeList, bucketChain); 
191
			
192
	}
193
	
194
	// If there are free buckets try to free up some disk
195
	// space or add them to a free list to be reused later.
196
	if (freeList.getSize()) {
197
		uint64_t last_bucket = fileSize - sizeof(MSADiskBucketRec);
198
		MSABucketInfo *rec;
199
		bool reduce = false;
200
		
201
		// Search for freed buckets at the end of the file
202
		// so that they can be released and the file
203
		// shrunk.
204
		//
205
		// The free list has been sorted so that buckets
206
		// with the highest file offset are first.
207
		do {
208
			rec = (MSABucketInfo*) freeList.itemAt(0);
209
			if (rec->bi_bucket_offset != last_bucket);
210
				break;
211
				
212
			last_bucket -= sizeof(MSADiskBucketRec);
213
			freeList.remove(rec);
214
			reduce = true;
215
		} while (freeList.getSize());
216
		
217
		if (reduce) {
218
			// The file can be reduced in size.
219
			fileSize = last_bucket + sizeof(MSADiskBucketRec);	
220
			fa->setEOF(fileSize);
221
			CS_SET_DISK_8(header.ah_file_size_8, fileSize);
222
			fa->write(&header.ah_file_size_8, offsetof(MSAliasHeadRec,ah_file_size_8) , 8); 
223
		}
224
		
225
		// Add the empty buckets to the index file's empty bucket list.
226
		memset(&bucketHead, 0, sizeof(bucketHead));
227
		offset = 0;
228
		while (freeList.getSize()) { // Add the empty buckets to the empty_bucket list.
229
			rec = (MSABucketInfo*) freeList.takeItemAt(0);
230
			
231
			// buckets are added to the front of the list.
232
			fa->write(&offset, rec->bi_bucket_offset + offsetof(MSADiskBucketHeadRec,ab_next_bucket_8) , 8);
233
			offset =  rec->bi_bucket_offset;
234
			fa->write(&offset, offsetof(MSAliasHeadRec,ah_free_list_8) , 8); 
235
			
236
			iFileShare->msa_empty_buckets.addFront(rec);
237
		}
238
	}
239
	
240
	iFileShare->msa_fileSize = fa->getEOF();
241
	
242
	release_(fa);
243
	exit_();
244
}
245
246
//------------------------
247
void MSAlias::buildAliasIndex()
248
{
249
	MSBlobHeadRec	blob;
250
	MSRepository	*repo;
251
	uint64_t			blob_size, fileSize, offset;
252
	uint16_t			head_size;
253
	MSAliasFile		*afile;
254
	MSAliasRec		aliasRec;
255
	
256
	enter_();
257
	
258
	afile = getAliasFile();
259
	frompool_(afile);
260
261
	afile->startLoad();
262
263
	CSSyncVector	*repo_list = iDatabase_br->getRepositoryList();
264
	
265
	// No locking is required since the index is loaded before the database is opened
266
	// and the compactor thread is started.
267
268
	for (uint32_t repo_index =0; repo_index<repo_list->size(); repo_index++) {
269
		if ((repo = (MSRepository *) repo_list->get(repo_index))) {
270
			MSRepoFile	*repoFile = repo->openRepoFile();
271
			push_(repoFile);
272
			fileSize = repo->getRepoFileSize();
273
			offset = repo->getRepoHeadSize();
274
			
275
			aliasRec.repo_id = repoFile->myRepo->getRepoID();
276
			
277
			while (offset < fileSize) {
278
				if (repoFile->read(&blob, offset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) 
279
					break;
280
					
281
				if ((CS_GET_DISK_1(blob.rb_status_1) == MS_BLOB_REFERENCED) && CS_GET_DISK_2(blob.rb_alias_offset_2)) {
282
					aliasRec.repo_offset = offset;
283
					aliasRec.alias_hash = CS_GET_DISK_4(blob.rb_alias_hash_4);
284
					addAlias(afile, &aliasRec);
285
				}
286
				
287
				head_size = CS_GET_DISK_2(blob.rb_head_size_2);
288
				blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
289
				offset += head_size + blob_size;
290
			}
291
			
292
			release_(repoFile);
293
		}
294
	}
295
	
296
	afile->finishLoad();
297
	backtopool_(afile);
298
299
	exit_();
300
}
301
302
//------------------------
303
void MSAlias::MSAliasBuild()
304
{
305
	CSFile *fa;
306
	MSAliasHeadRec header = {0};
307
	uint64_t offset, size = sizeof(header) + BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec);
308
	enter_();
309
	
310
	fa = CSFile::newFile(RETAIN(iFilePath));
311
	push_(fa);
312
313
	fa->open(CSFile::CREATE | CSFile::TRUNCATE);
314
315
	// Create an empty index with 1 empty bucket in each bucket chain.	
316
317
	CS_SET_DISK_4(header.ah_magic_4,  MS_ALIAS_FILE_MAGIC);
318
	CS_SET_DISK_2(header.ah_version_2, MS_ALIAS_FILE_VERSION);
319
		
320
	CS_SET_DISK_2(header.ah_head_size_2, sizeof(header));
321
	CS_SET_DISK_8(header.ah_file_size_8, size);
322
	
323
	CS_SET_DISK_2(header.ah_num_buckets_2, BUCKET_LIST_SIZE);
324
	CS_SET_DISK_2(header.ah_bucket_size_4, NUM_RECORDS_PER_BUCKET);
325
	
326
	fa->setEOF(size); // Grow the file.
327
	fa->write(&header, 0, sizeof(header));
328
329
	offset = sizeof(header);
330
	
331
	// Initialize the file bucket chains.
332
	MSADiskBucketHeadRec bucketHead = {0};
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
333
	for (uint32_t i = 0; i < BUCKET_LIST_SIZE; i++) {
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
334
		CS_SET_DISK_8(bucketHead.ab_prev_bucket_8, offset);
335
		CS_SET_DISK_8(bucketHead.ab_next_bucket_8, offset);
336
		fa->write(&bucketHead, offset, sizeof(MSADiskBucketHeadRec));
337
		// Add the bucket to the RAM based list.
338
		iFileShare->msa_buckets[i].addFront(MSABucketInfo::newMSABucketInfo(offset));
339
		offset += sizeof(MSADiskBucketRec); // NOTE: MSADiskBucketRec not MSADiskBucketHeadRec
340
	}
341
	
342
	fa->sync();
343
	
344
	
345
	
346
	fa->close();
347
	
348
	release_(fa);
349
	
350
	// Scan through all the BLOBs in the repository and add an entry
351
	// for each blob alias.
352
	buildAliasIndex();
353
354
	exit_();
355
}
356
357
//------------------------
358
void MSAlias::ma_open(const char *file_name)
359
{
360
	bool isdir = false;
361
	
362
	enter_();
363
364
	iFilePath = CSPath::newPath(RETAIN(iDatabase_br->myDatabasePath), file_name);
365
	
366
retry:
367
	new_(iFileShare, MSAliasFileShare(RETAIN(iFilePath)));
368
	
369
	if (iFilePath->exists(&isdir)) {
370
		try_(a) {
371
			MSAliasLoad(); 
372
		}
373
		catch_(a) {
374
			// If an error occurs delete the index and rebuild it.
375
			self->myException.log(NULL);
376
			iFileShare->release();
377
			iFilePath->removeFile();
378
			goto retry;
379
		}
380
		cont_(a);
381
	} else
382
		MSAliasBuild();
383
	
384
	
385
	exit_();
386
}
387
388
//------------------------
389
uint32_t MSAlias::hashAlias(const char *ptr)
390
{
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
391
	register uint32_t h = 0, g;
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
392
	
393
	while (*ptr) {
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
394
		h = (h << 4) + (uint32_t) toupper(*ptr++);
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
395
		if ((g = (h & 0xF0000000)))
396
			h = (h ^ (g >> 24)) ^ g;
397
	}
398
399
	return (h);
400
}
401
402
//------------------------
403
void MSAlias::addAlias(MSAliasFile *af, MSAliasRec *rec)
404
{
405
	MSDiskAliasRec diskRec;
406
	CS_SET_DISK_4(diskRec.ar_repo_id_4, rec->repo_id);	
407
	CS_SET_DISK_8(diskRec.ar_offset_8, rec->repo_offset);	
408
	CS_SET_DISK_4(diskRec.ar_hash_4, rec->alias_hash);
409
	af->addRec(&diskRec);
410
411
}
412
413
//------------------------
414
uint32_t MSAlias::addAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
415
{
416
	MSDiskAliasRec diskRec;
417
	uint32_t hash;
418
	uint32_t f_repo_id;
419
	uint64_t f_repo_offset;
420
	bool referenced = false;
421
	enter_();
422
	
423
	hash = hashAlias(alias);
424
	
425
	// Use a lock to make sure that the same alias cannot be added at the same time.
426
	lock_(this);
427
	
428
	MSAliasFile *af = getAliasFile();
429
	frompool_(af);
430
431
	if (findBlobByAlias(RETAIN(af), alias, &referenced, &f_repo_id, &f_repo_offset)) {
432
		if ((f_repo_id == repo_id) && (f_repo_offset == repo_offset))
433
			goto done; // Do not treat this as an error.
434
		if (!referenced) {
435
			// If the alias is in use by a non referenced BLOB then delete it.
436
			// This can happen because I allow newly created BLOBs to be accessed
437
			// by their alias even before a reference to the BLOB has been added to
438
			// the database.
439
			af->deleteCurrentRec();
440
		} else	{
441
#ifdef xxDEBUG
442
			CSL.log(self, CSLog::Protocol, "Alias: ");
443
			CSL.log(self, CSLog::Protocol, alias);
444
			CSL.log(self, CSLog::Protocol, "\n");
445
#endif
446
			CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Alias Exists");
447
		}
448
	}
449
		
450
	CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);	
451
	CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);	
452
	CS_SET_DISK_4(diskRec.ar_hash_4, hash);	
453
454
	af->addRec(&diskRec);
455
done:
456
	backtopool_(af);
457
458
	unlock_(this);
459
	return_(hash);
460
}
461
462
//------------------------
463
void MSAlias::deleteAlias(MSDiskAliasPtr diskRec)
464
{
465
	enter_();
466
	
467
	MSAliasFile *af = getAliasFile();
468
	frompool_(af);
469
	if (af->findRec(diskRec))
470
		af->deleteCurrentRec();
471
	backtopool_(af);
472
473
	exit_();
474
}
475
476
//------------------------
477
void MSAlias::deleteAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
478
{
479
	MSDiskAliasRec diskRec;
480
	
481
	CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);	
482
	CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);	
483
	CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);	
484
	deleteAlias(&diskRec);
485
	
486
}
487
//------------------------
488
void MSAlias::resetAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
489
{
490
	MSDiskAliasRec diskRec;
491
	bool found;
492
	enter_();
493
	
494
	CS_SET_DISK_4(diskRec.ar_repo_id_4, old_repo_id);	
495
	CS_SET_DISK_8(diskRec.ar_offset_8, old_repo_offset);	
496
	CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);	
497
498
	lock_(this);
499
	
500
	MSAliasFile *af = getAliasFile();
501
	frompool_(af);
502
	found = af->findRec(&diskRec);
503
	CS_SET_DISK_4(diskRec.ar_repo_id_4, new_repo_id);	
504
	CS_SET_DISK_8(diskRec.ar_offset_8, new_repo_offset);	
505
506
	if (found) 
507
		af->updateCurrentRec(&diskRec);
508
	else {
509
		CSException::logException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Alias doesn't exists");
510
		af->addRec(&diskRec);
511
	}
512
			
513
	backtopool_(af);
514
515
	unlock_(this);
516
	exit_();
517
}
518
519
//------------------------
520
// Check to see if the blob with the given repo_id
521
// and repo_offset has the specified alias.
522
bool MSAlias::hasBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias, bool *referenced)
523
{
524
	bool found = false;
525
	MSRepoFile *repoFile;
526
	MSBlobHeadRec	blob;
527
	uint8_t status;
528
	uint64_t offset;
529
	uint32_t alias_size = strlen(alias) +1;
530
	char blob_alias[BLOB_ALIAS_LENGTH +1];
531
	
532
	if (alias_size > BLOB_ALIAS_LENGTH)
533
		return false;
534
535
	enter_();
536
	
537
	repoFile = iDatabase_br->getRepoFileFromPool(repo_id, false);
538
	frompool_(repoFile);
539
540
	repoFile->read(&blob, repo_offset, sizeof(MSBlobHeadRec), sizeof(MSBlobHeadRec));
541
	status = CS_GET_DISK_1(blob.rb_status_1);
542
	if (IN_USE_BLOB_STATUS(status)) {
543
		offset = repo_offset + CS_GET_DISK_2(blob.rb_alias_offset_2);
544
		
545
		blob_alias[BLOB_ALIAS_LENGTH] = 0;
546
		if (repoFile->read(blob_alias, offset, alias_size, 0) == alias_size) {
547
			found = !my_strcasecmp(&my_charset_utf8_general_ci, blob_alias, alias);
548
			if (found)
549
				*referenced = (status == MS_BLOB_REFERENCED);
550
		}
551
	} else {
552
		CSException::logException(CS_CONTEXT, MS_ERR_ENGINE, "Deleted BLOB alias found. (Rebuild BLOB alias index.)");
553
	}
554
	
555
		
556
	backtopool_(repoFile);	
557
558
	return_(found);
559
}
560
561
//------------------------
562
bool MSAlias::findBlobByAlias( MSAliasFile *af, const char *alias, bool *referenced, uint32_t *repo_id, uint64_t *repo_offset)
563
{
564
	bool found = false;
565
	uint32_t hash, l_repo_id, l_repo_offset;
566
	MSDiskAliasPtr diskRec;
567
	enter_();
568
569
	push_(af);
570
	
571
	hash = hashAlias(alias);
572
	diskRec = af->findRec(hash);
573
	
574
	while (diskRec && !found) {
575
		l_repo_id = CS_GET_DISK_4(diskRec->ar_repo_id_4);
576
		l_repo_offset = CS_GET_DISK_8(diskRec->ar_offset_8);
577
		if (hasBlobAlias(l_repo_id, l_repo_offset, alias, referenced))
578
			found = true;
579
		else
580
			diskRec = af->nextRec();
581
	}
582
		
583
	if (found) {
584
		if (repo_id)
585
			*repo_id = l_repo_id;
586
			
587
		if (repo_offset)
588
			*repo_offset = l_repo_offset;
589
	}
590
	
591
	release_(af);
592
	return_(found);
593
}
594
//------------------------
595
bool MSAlias::findBlobByAlias( const char *alias, bool *referenced, uint32_t *repo_id, uint64_t *repo_offset)
596
{
597
	bool found;
598
	enter_();
599
	
600
	MSAliasFile *af = getAliasFile();
601
	frompool_(af);
602
	
603
	found = findBlobByAlias(RETAIN(af), alias, referenced, repo_id, repo_offset);
604
605
	backtopool_(af);
606
	return_(found);
607
}
608
609
//------------------------
610
bool MSAlias::blobAliasExists(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
611
{
612
	bool found;
613
	MSDiskAliasRec diskRec;
614
	
615
	CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id);	
616
	CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);	
617
	CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);	
618
619
	enter_();
620
	
621
	MSAliasFile *af = getAliasFile();
622
	frompool_(af);
623
	
624
	found = af->findRec(&diskRec);
625
626
	backtopool_(af);
627
	return_(found);
628
}
629
630
/////////////////////////////////////
631
MSSysMeta::MSSysMeta(MSAlias *msa)
632
{
633
	md_myMSAlias = msa;
634
	md_isFileInUse = false;
635
	md_NextLink = md_PrevLink = NULL;
636
	
637
	mtab = MSMetaDataTable::newMSMetaDataTable(RETAIN(msa->iDatabase_br));
638
}
639
640
//------------------------
641
MSSysMeta::~MSSysMeta()
642
{
643
	if (mtab)
644
		mtab->release();
645
646
	if (md_myMSAlias)
647
		md_myMSAlias->release();
648
}
649
650
//------------------------
651
void MSSysMeta::returnToPool()
652
{
653
	enter_();
654
	push_(this);
655
	
656
		
657
	md_isFileInUse = false;
658
		
659
	if (!md_myMSAlias->iClosing) {
660
		lock_(&md_myMSAlias->iSysTablePoolLock); // It may be better if the pool had it's own lock.
661
		md_nextFile = md_myMSAlias->iSysTablePool;
662
		md_myMSAlias->iSysTablePool - this;
663
		unlock_(&md_myMSAlias->iSysTablePoolLock);
664
	}
665
		
666
	release_(this);
667
	exit_();
668
}
669
//------------------------
670
bool MSSysMeta::matchAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
671
{
672
	mtab->seqScanInit(); 
673
	return mtab->matchAlias(repo_id, repo_offset, alias); 
674
}
675
676
/////////////////////////////////////
677
/////////////////////////////////////
678
MSAliasFile::MSAliasFile(MSAliasFileShare *share)
679
{
680
	ba_share = share;
681
	ba_isFileInUse = false;
682
	ba_NextLink = ba_PrevLink = NULL;
683
	
684
	iCurrentRec = 0;
685
	iBucketCache = NULL;
686
	iStartBucket = iCurrentBucket = NULL;
687
	iBucketChain = NULL;
688
	iLoading = false;
689
	ba_nextFile = NULL;
690
	
691
	iFile = CSFile::newFile(RETAIN(ba_share->msa_filePath));	
692
	iFile->open(CSFile::DEFAULT);
693
	
694
	
695
}
696
697
//------------------------
698
MSAliasFile::~MSAliasFile()
699
{
700
	if (iFile)
701
		iFile->release();
702
		
703
	if (iBucketCache)
704
		cs_free(iBucketCache);
705
}
706
707
//------------------------
708
void MSAliasFile::startLoad()
709
{
710
	enter_();
711
	
712
	ASSERT(!iLoading);
713
	
714
//	iBucketCache = (MSADiskBucketRec*) cs_malloc(BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec));
715
//	memset(iBucketCache, 0, BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec));
716
	iLoading = true;
717
	
718
	exit_();
719
}
720
721
//------------------------
722
void MSAliasFile::finishLoad()
723
{
724
	enter_();
725
	ASSERT(iLoading);
726
	// Write the bucket cache to disk.
727
//	for (iCurrentBucket && iCurrentBucket->getSize()) {
728
		// To Be implemented.
729
//	}
730
//	cs_free(iBucketCache);
731
	iBucketCache = NULL;
732
	iLoading = false;
733
	exit_();
734
}
735
736
//------------------------
737
void MSAliasFile::returnToPool()
738
{
739
	enter_();
740
	push_(this);
741
	
742
	if (iLoading) {
743
		// If iLoading is still set then probably an exception has been thrown.
744
		try_(a) {
745
			finishLoad();
746
		}
747
		catch_(a) 
748
			iLoading = false;
749
		cont_(a);
750
	}
751
752
	ba_isFileInUse = false;
753
		
754
	if (!ba_share->msa_closing) {
755
		lock_(&ba_share->msa_poolLock);
756
		ba_nextFile = ba_share->msa_pool;
757
		ba_share->msa_pool = this;
758
		unlock_(&ba_share->msa_poolLock);
759
	}
760
		
761
	release_(this);
762
	exit_();
763
}
764
765
//------------------------
766
// The bucket chain is treated as a circular list.
767
bool MSAliasFile::nextBucket(bool with_space)
768
{
769
	bool have_bucket = false;
770
	enter_();
771
	
772
	while (!have_bucket){
773
		if (iCurrentBucket) {
774
			iCurrentBucket = iCurrentBucket->getNextLink();
775
			if (!iCurrentBucket)
776
				iCurrentBucket = iBucketChain->getFront();
777
			if (iCurrentBucket == iStartBucket)
778
				break;
779
		} else {
780
			iCurrentBucket = iBucketChain->getFront();
781
			iStartBucket = iCurrentBucket;
782
		}
783
		
784
		if ((iCurrentBucket->getSize() && !with_space) || (with_space && (iCurrentBucket->getSize() < NUM_RECORDS_PER_BUCKET))){
785
			// Only read the portion of the bucket containing records.
786
			iCurrentRec = iCurrentBucket->getEndOfRecords(); // The current record is set just beyond the last valid record.
787
			size_t size = iCurrentRec * sizeof(MSDiskAliasRec);		
788
			iFile->read(iBucket, iCurrentBucket->bi_records_offset, size, size);			
789
			have_bucket = true;
790
		}
791
	}
792
	
793
	return_(have_bucket);
794
}
795
796
//------------------------
797
MSDiskAliasPtr MSAliasFile::nextRec()
798
{
799
	MSDiskAliasPtr rec = NULL;
800
	bool have_rec;
801
	enter_();
802
	
803
	while ((!(have_rec = scanBucket())) && nextBucket(false));
804
	
805
	if (have_rec) 
806
		rec = &(iBucket[iCurrentRec]);
807
		
808
	return_(rec);
809
}
810
811
//------------------------
812
// When starting a search:
813
// If a bucket is already loaded and it is in the correct bucket chain
814
// then search it first. In this case then the search starts at the current
815
// bucket in the chain.
816
//
817
// Searches are from back to front with the idea that the more recently
818
// added objects will be seached for more often and they are more likely
819
// to be at the end of the chain.
820
MSDiskAliasPtr MSAliasFile::findRec(uint32_t hash)
821
{
822
	MSDiskAliasPtr rec = NULL;
823
	MSABucketLinkedList *list = ba_share->getBucketChain(hash);
824
	enter_();
825
	
826
	CS_SET_DISK_4(iDiskHash_4, hash);
827
	if (list == iBucketChain) {
828
		// The search is performed back to front.
829
		iCurrentRec = iCurrentBucket->getEndOfRecords();  // Position the start just beyond the last valid record.
830
		iStartBucket = iCurrentBucket;
831
		if (scanBucket()) {
832
			rec = &(iBucket[iCurrentRec]);
833
			goto done;
834
		}
835
	} else {
836
		iBucketChain = list;
837
		iCurrentBucket = NULL;
838
		iStartBucket = NULL;
839
	}
840
841
	if (nextBucket(false))
842
		rec = nextRec();
843
		
844
done:
845
	return_(rec);
846
}
847
848
//------------------------
849
bool MSAliasFile::findRec(MSDiskAliasPtr theRec)
850
{
851
	MSDiskAliasPtr aRec = NULL;
852
	bool found = false;
853
	enter_();
854
	
855
	aRec = findRec(CS_GET_DISK_4(theRec->ar_hash_4));
856
	while ( aRec && !found) {
857
		if (CS_EQ_DISK_4(aRec->ar_repo_id_4, theRec->ar_repo_id_4) && CS_EQ_DISK_8(aRec->ar_offset_8, theRec->ar_offset_8))
858
			found = true;
859
		else
860
			aRec = nextRec();
861
	}	
862
	return_(found);
863
}
864
865
//------------------------
866
void MSAliasFile::addRec(MSDiskAliasPtr new_rec)
867
{
868
	MSABucketLinkedList *list = ba_share->getBucketChain(CS_GET_DISK_4(new_rec->ar_hash_4));
869
	enter_();
870
	lock_(&ba_share->msa_writeLock);
871
872
	if (iBucketChain != list) {
873
		iBucketChain = list;
874
		iCurrentBucket = NULL;
875
		iStartBucket = NULL;
876
	} else 
877
		iStartBucket = iCurrentBucket;
878
879
	if ((iCurrentBucket && (iCurrentBucket->getSize() < NUM_RECORDS_PER_BUCKET)) || nextBucket(true)) { // Find a bucket with space in it for a record.
880
		uint32_t size = iCurrentBucket->getSize();
881
		uint32_t end_of_records = iCurrentBucket->getEndOfRecords();
882
883
		if (size == end_of_records) { // No holes in the recored list
884
			iCurrentRec = end_of_records;			
885
		} else { // Search for the empty record
886
			iCurrentRec = end_of_records -2;			
887
			while (iCurrentRec && !CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4))
888
				iCurrentRec--;
889
				
890
			ASSERT(CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4));
891
		}
892
		
893
		memcpy(&iBucket[iCurrentRec], new_rec, sizeof(MSDiskAliasRec)); // Add the record to the cached bucket.
894
		
895
		iCurrentBucket->recAdded(iFile, iCurrentRec); // update the current bucket header.
896
	} else { // A new bucket must be added to the chain.
897
		MSADiskBucketHeadRec new_bucket = {0};
898
		CSDiskValue8 disk_8_value;
899
		uint64_t new_bucket_offset;
900
		MSABucketInfo *next, *prev;
901
		
902
		next = iBucketChain->getFront();
903
		prev = iBucketChain->getBack();
904
		
905
		// Set the next and prev bucket offsets in the new bucket record.
906
		CS_SET_DISK_8(new_bucket.ab_prev_bucket_8, prev->bi_bucket_offset);
907
		CS_SET_DISK_8(new_bucket.ab_next_bucket_8, next->bi_bucket_offset);
908
		
909
		if (ba_share->msa_empty_buckets.getSize()) { // Get a bucket from the empty bucket list.
910
			MSABucketInfo *empty_bucket = ba_share->msa_empty_buckets.removeFront();
911
			
912
			new_bucket_offset = empty_bucket->bi_bucket_offset;			
913
			empty_bucket->release();
914
			
915
			// Update the index file's empty bucket list 
916
			if (ba_share->msa_empty_buckets.getSize() == 0) 
917
				CS_SET_NULL_DISK_8(disk_8_value);
918
			else
919
				CS_SET_DISK_8(disk_8_value, iBucketChain->getFront()->bi_bucket_offset);
920
			
921
			iFile->write(&disk_8_value, offsetof(MSAliasHeadRec,ah_free_list_8) , 8); 
922
		} else // There are no empty buckets so grow the file.
923
			new_bucket_offset = ba_share->msa_fileSize;
924
			
925
		// Write the new bucket's record header to the file
926
		iFile->write(&new_bucket, new_bucket_offset, sizeof(MSADiskBucketHeadRec)); 
927
		
928
		// Insert the new bucket into the bucket chain on the disk.
929
		CS_SET_DISK_8(disk_8_value, new_bucket_offset);
930
		iFile->write(&disk_8_value, prev->bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_next_bucket_8), 8); 
931
		iFile->write(&disk_8_value, next->bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_prev_bucket_8), 8); 
932
		
933
		// Update the file size in the file header if required
934
		if (ba_share->msa_fileSize == new_bucket_offset) {
935
			ba_share->msa_fileSize += sizeof(MSADiskBucketRec); // Note this is MSADiskBucketRec not MSADiskBucketHeadRec
936
937
			CS_SET_DISK_8(disk_8_value, ba_share->msa_fileSize);
938
			iFile->write(&disk_8_value, offsetof(MSAliasHeadRec,ah_file_size_8) , 8); 
939
		}
940
		
941
		// Add the info rec into the bucket chain in RAM.
942
		iCurrentBucket = MSABucketInfo::newMSABucketInfo(new_bucket_offset, 1, 0);
943
		iBucketChain->addFront(iCurrentBucket);
944
		iCurrentRec = 0;
945
	}
946
	
947
	uint64_t offset;
948
	offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
949
950
	// Write the new index entry to the index file.
951
	iFile->write(new_rec, offset, sizeof(MSDiskAliasRec)); 
952
	
953
	unlock_(&ba_share->msa_writeLock);
954
	
955
	exit_();	
956
}
957
//------------------------
958
void MSAliasFile::deleteCurrentRec()
959
{
960
	MSDiskAliasPtr rec = &(iBucket[iCurrentRec]);
961
	uint64_t	offset;
962
	enter_();
963
	
964
	CS_SET_NULL_DISK_4(rec->ar_repo_id_4);
965
	offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
966
	
967
	lock_(&ba_share->msa_writeLock);
968
969
	// Update the index file. It is assumed that repo_id is the first 4 bytes of 'rec'.
970
	iFile->write(rec, offset, 4); 
971
	
972
	iCurrentBucket->recRemoved(iFile, iCurrentRec, iBucket);
973
	
974
	unlock_(&ba_share->msa_writeLock);
975
	
976
	exit_();	
977
}
978
979
//------------------------
980
void MSAliasFile::updateCurrentRec(MSDiskAliasPtr update_rec)
981
{
982
	uint64_t	offset;
983
	enter_();
984
	
985
	// ASSERT that the updated rec still belongs to this bucket chain.
986
	ASSERT(ba_share->getBucketChain(CS_GET_DISK_4(update_rec->ar_hash_4)) == iBucketChain);
987
	ASSERT(!CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4)); // We should not be updating a deleted record.
988
	
989
	lock_(&ba_share->msa_writeLock);
990
	offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
991
	
992
	// Update the record on disk.
993
	iFile->write(update_rec, offset, sizeof(MSDiskAliasRec));
994
	
995
	// Update the record in memory. 
996
	CS_COPY_DISK_4(iBucket[iCurrentRec].ar_repo_id_4, update_rec->ar_repo_id_4);	
997
	CS_COPY_DISK_8(iBucket[iCurrentRec].ar_offset_8, update_rec->ar_offset_8);	
998
999
	unlock_(&ba_share->msa_writeLock);
1000
	exit_();	
1001
}
1002
1003
1004
//------------------------
1005
MSABucketInfo *MSABucketInfo::newMSABucketInfo(uint64_t offset, uint32_t num, uint32_t last)
1006
{
1007
	MSABucketInfo *bucket;
1008
	new_(bucket, MSABucketInfo(offset, num, last));
1009
	return bucket;
1010
}
1011
//------------------------
1012
void MSABucketInfo::recRemoved(CSFile *iFile, uint32_t idx, MSDiskAliasRec bucket[])
1013
{
1014
	MSADiskBucketHeadRec head;
1015
	enter_();
1016
	
1017
	ASSERT(idx < bi_end_of_records);
1018
	
1019
	bi_num_recs--;
1020
	if (!bi_num_recs) {
1021
		// It would be nice to remove this bucket from the 
1022
		// bucket list and place it on the empty list.
1023
		// Before this can be done a locking method would
1024
		// be needed to block anyone from reading this
1025
		// bucket while it was being moved.
1026
		//
1027
		// I haven't done this because I have been trying
1028
		// to avoid read locks.
1029
		bi_end_of_records = 0;
1030
	} else if ((bi_end_of_records -1) == idx) {
1031
		while (idx && CS_IS_NULL_DISK_4(bucket[idx].ar_repo_id_4))
1032
			idx--;
1033
			
1034
		if ((idx ==0) && CS_IS_NULL_DISK_4(bucket[0].ar_repo_id_4))
1035
			bi_end_of_records = 0;
1036
		else
1037
			bi_end_of_records = idx +1;
1038
		
1039
		ASSERT(bi_end_of_records >= bi_num_recs);
1040
	}
1041
	
1042
	// Update the index file.
1043
	CS_SET_DISK_4(head.ab_num_recs_4, bi_num_recs);
1044
	CS_SET_DISK_4(head.ab_eor_rec_4, bi_end_of_records);
1045
	iFile->write(&head.ab_num_recs_4, bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_num_recs_4), 8); 
1046
	exit_();
1047
}
1048
1049
//------------------------
1050
void MSABucketInfo::recAdded(CSFile *iFile, uint32_t idx)
1051
{
1052
	MSADiskBucketHeadRec head;
1053
	enter_();
1054
	
1055
	ASSERT(bi_num_recs < NUM_RECORDS_PER_BUCKET);
1056
	ASSERT(idx < NUM_RECORDS_PER_BUCKET);
1057
1058
	bi_num_recs++;
1059
	if (idx == bi_end_of_records)
1060
		bi_end_of_records++;
1061
		
1062
	// Update the index file.
1063
	CS_SET_DISK_4(head.ab_num_recs_4, bi_num_recs);
1064
	CS_SET_DISK_4(head.ab_eor_rec_4, bi_end_of_records);
1065
	iFile->write(&head.ab_num_recs_4, bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_num_recs_4), 8); 
1066
	exit_();	
1067
}
1068
1069
//////////////////////////////////
1070
MSAliasFile *MSAliasFileShare::getPoolFile()
1071
{
1072
	MSAliasFile *af;
1073
	enter_();
1074
	
1075
	lock_(&msa_poolLock); 
1076
	if ((af = msa_pool)) {
1077
		msa_pool = af->ba_nextFile;
1078
	} else {
1079
		new_(af, MSAliasFile(this));
1080
		msa_poolFiles.addFront(af);
1081
	}
1082
	unlock_(&msa_poolLock);
1083
	
1084
	af->ba_nextFile = NULL;
1085
	ASSERT(!af->ba_isFileInUse);
1086
	af->ba_isFileInUse = true;
1087
	af->retain();
1088
	
1089
	return_(af);
1090
}
1091
#endif // HAVE_ALIAS_SUPPORT
1092