1
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
30
#include "cslib/CSConfig.h"
34
#include "cslib/CSGlobal.h"
35
#include "cslib/CSStrUtil.h"
36
#include "cslib/CSStorage.h"
38
#include "compactor_ms.h"
39
#include "open_table_ms.h"
40
#include "repository_ms.h"
41
#include "parameters_ms.h"
44
* ---------------------------------------------------------------
48
MSCompactorThread::MSCompactorThread(time_t wait_time, MSDatabase *db):
49
CSDaemon(wait_time, NULL),
50
iCompactorDatabase(db)
54
void MSCompactorThread::close()
58
bool MSCompactorThread::doWork()
61
MSRepository *src_repo, *dst_repo;
62
MSRepoFile *src_file, *dst_file;
67
uint64_t blob_size, blob_data_size;
69
MSRepoPointersRec ptr;
70
uint32_t table_ref_count;
71
uint32_t blob_ref_count;
80
uint64_t repo_blob_size;
81
uint16_t repo_head_size;
88
#ifdef MS_COMPACTOR_POLLS
89
if (!(src_repo = iCompactorDatabase->getRepoFullOfTrash(NULL)))
92
myWaitTime = MS_DEFAULT_COMPACTOR_WAIT * 1000; // Time in milli-seconds
93
if (!(src_repo = iCompactorDatabase->getRepoFullOfTrash(&myWaitTime)))
97
src_file = src_repo->openRepoFile();
100
dst_repo = iCompactorDatabase->lockRepo(src_repo->myRepoFileSize - src_repo->myGarbageCount);
102
dst_file = dst_repo->openRepoFile();
105
new_(head, CSStringBuffer(100));
109
src_repo_id = src_repo->myRepoID;
110
src_offset = src_repo->myRepoHeadSize;
111
//printf("\nCompacting repo %"PRId32"\n\n", src_repo_id);
118
backtopool_(dst_repo);
120
backtopool_(src_repo);
122
myWaitTime = 5 * 1000; // Time in milli-seconds
126
while (src_offset < src_repo->myRepoFileSize) {
134
// A lock is required here because references and dereferences to the
135
// BLOBs can result in the repository record being updated while
136
// it is being copied.
137
mylock = &src_repo->myRepoLock[src_offset % CS_REPO_REC_LOCK_COUNT];
139
if (src_file->read(&blob, src_offset, src_repo->myRepoBlobHeadSize, 0) < src_repo->myRepoBlobHeadSize) {
143
ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
144
ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
145
head_size = CS_GET_DISK_2(blob.rb_head_size_2);
146
blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
147
blob_data_size = CS_GET_DISK_6(blob.rb_blob_data_size_6);
148
status = CS_GET_DISK_1(blob.rb_status_1);
149
if ((blob_data_size == 0) || ref_count <= 0 || ref_size == 0 ||
150
head_size < src_repo->myRepoBlobHeadSize + ref_count * ref_size ||
151
!VALID_BLOB_STATUS(status)) {
152
/* Can't be true. Assume this is garbage! */
157
if (IN_USE_BLOB_STATUS(status)) {
158
head->setLength(head_size);
159
if (src_file->read(head->getBuffer(0), src_offset, head_size, 0) != head_size) {
167
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
168
for (int count = 0; count < ref_count; count++) {
169
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
170
case MS_BLOB_FREE_REF:
172
case MS_BLOB_TABLE_REF:
173
/* Check the reference: */
174
tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
175
blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
177
otab = MSTableList::getOpenTableByID(iCompactorDatabase->myDatabaseID, tab_id);
180
/* Ignore the return value (it will fail because auth_code is wrong!)!! */
181
uint32_t auth_code = 0;
182
otab->getDBTable()->readBlobHandle(otab, blob_id, &auth_code, &repo_id, &repo_offset, &repo_blob_size, &repo_head_size, false);
184
if (repo_id == src_repo_id &&
185
repo_offset == src_offset &&
186
repo_blob_size == blob_data_size &&
187
repo_head_size == head_size)
190
/* Remove the reference: */
191
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
194
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
196
case MS_BLOB_DELETE_REF:
197
/* These are temporary references from the TempLog file. */
198
/* We try to prevent this from happening, but it can! */
199
uint32_t temp_log_id;
200
uint32_t temp_log_offset;
201
MSTempLogFile *temp_log;
203
temp_log_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);
204
temp_log_offset = CS_GET_DISK_4(ptr.rp_temp_ref->tp_offset_4);
205
if ((temp_log = iCompactorDatabase->openTempLogFile(temp_log_id, NULL, NULL))) {
206
MSTempLogItemRec log_item;
211
if (temp_log->read(&log_item, temp_log_offset, sizeof(MSTempLogItemRec), 0) == sizeof(MSTempLogItemRec)) {
212
then = CS_GET_DISK_4(log_item.ti_time_4);
214
if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
215
/* Wait for the BLOB to expire before we continue: */
219
/* Go to sleep until the problem has gone away! */
221
suspendedWait(MSTempLog::adjustWaitTime(then, now));
229
/* Remove the temp reference: */
230
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
233
tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
234
if (tab_index > ref_count || !tab_index) {
235
/* Can't be true. Assume this is garbage! */
243
ptr.rp_chars += ref_size;
246
if (table_ref_count && blob_ref_count) {
247
/* Check the blob references again to make sure that they
248
* refer to valid table references.
250
MSRepoTableRefPtr tab_ref;
253
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
254
for (int count = 0; count < ref_count; count++) {
255
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
256
case MS_BLOB_FREE_REF:
257
case MS_BLOB_TABLE_REF:
258
case MS_BLOB_DELETE_REF:
260
default: // If it isn't one of the above we assume it is an blob ref. (er_table_2 can never have a value equal to one of the above REF type flags.)
261
// It was already verified above that the index was with in range.
262
tab_ref = (MSRepoTableRefPtr) (head->getBuffer(0) + src_repo->myRepoBlobHeadSize + (CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1) * ref_size);
263
if (CS_GET_DISK_2(tab_ref->rr_type_2) == MS_BLOB_TABLE_REF)
267
ptr.rp_chars += ref_size;
271
if (blob_ref_count) {
274
dst_offset = dst_repo->myRepoFileSize;
276
/* Write the header. */
277
dst_file->write(head->getBuffer(0), dst_offset, head_size);
279
/* We have an engine reference, copy the BLOB over: */
280
CSFile::transfer(RETAIN(dst_file), dst_offset + head_size, RETAIN(src_file), src_offset + head_size, blob_size, iCompactBuffer, MS_COMPACTOR_BUFFER_SIZE);
282
#ifdef HAVE_ALIAS_SUPPORT
283
/* If the BLOB has an alias update the alias index. */
284
if (CS_GET_DISK_2(blob.rb_alias_offset_2)) {
285
iCompactorDatabase->moveBlobAlias( src_repo_id, src_offset, CS_GET_DISK_4(blob.rb_alias_hash_4), dst_repo->myRepoID, dst_offset);
288
/* Update the references: */
289
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
290
for (int count = 0; count < ref_count; count++) {
291
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
292
case MS_BLOB_FREE_REF:
294
case MS_BLOB_TABLE_REF:
295
tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
296
blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
298
if ((otab = MSTableList::getOpenTableByID(iCompactorDatabase->myDatabaseID, tab_id))) {
300
otab->getDBTable()->updateBlobHandle(otab, blob_id, dst_repo->myRepoID, dst_offset, 0);
304
case MS_BLOB_DELETE_REF:
309
ptr.rp_chars += ref_size;
312
dst_repo->myRepoFileSize += head_size + blob_size;
317
src_offset += head_size + blob_size;
320
src_repo->mustBeDeleted = true;
326
backtopool_(dst_repo);
328
backtopool_(src_repo);
331
iCompactorDatabase->removeRepo(src_repo_id, &myMustQuit);
338
void *MSCompactorThread::completeWork()