1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
32
#include "CSStrUtil.h"
33
#include "CSStorage.h"
36
#include "Compactor_ms.h"
37
#include "OpenTable_ms.h"
38
#include "Repository_ms.h"
41
* ---------------------------------------------------------------
45
MSCompactorThread::MSCompactorThread(time_t wait_time, MSDatabase *db):
46
CSDaemon(wait_time, NULL),
47
iCompactorDatabase(db)
51
MSCompactorThread::~MSCompactorThread()
56
void MSCompactorThread::close()
60
bool MSCompactorThread::doWork()
63
MSRepository *src_repo, *dst_repo;
64
MSRepoFile *src_file, *dst_file;
69
uint64_t blob_size, blob_data_size;
71
MSRepoPointersRec ptr;
72
u_int table_ref_count;
82
uint64_t repo_blob_size;
83
uint16_t repo_head_size;
90
#ifdef MS_COMPACTOR_POLLS
91
if (!(src_repo = iCompactorDatabase->getRepoFullOfTrash(NULL)))
94
myWaitTime = MS_DEFAULT_COMPACTOR_WAIT * 1000; // Time in milli-seconds
95
if (!(src_repo = iCompactorDatabase->getRepoFullOfTrash(&myWaitTime)))
99
src_file = src_repo->openRepoFile();
102
dst_repo = iCompactorDatabase->lockRepo(src_repo->myRepoFileSize - src_repo->myGarbageCount);
104
dst_file = dst_repo->openRepoFile();
107
new_(head, CSStringBuffer(100));
111
src_repo_id = src_repo->myRepoID;
112
src_offset = src_repo->myRepoHeadSize;
113
//printf("\nCompacting repo %"PRId32"\n\n", src_repo_id);
120
backtopool_(dst_repo);
122
backtopool_(src_repo);
124
myWaitTime = 5 * 1000; // Time in milli-seconds
128
while (src_offset < src_repo->myRepoFileSize) {
136
// A lock is required here because references and dereferences to the
137
// BLOBs can result in the repository record being updated while
138
// it is being copied.
139
lock = &src_repo->myRepoLock[src_offset % CS_REPO_REC_LOCK_COUNT];
141
if (src_file->read(&blob, src_offset, src_repo->myRepoBlobHeadSize, 0) < src_repo->myRepoBlobHeadSize) {
145
ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
146
ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
147
head_size = CS_GET_DISK_2(blob.rb_head_size_2);
148
blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
149
blob_data_size = CS_GET_DISK_6(blob.rb_blob_data_size_6);
150
status = CS_GET_DISK_1(blob.rb_status_1);
151
if ((blob_data_size == 0) || ref_count <= 0 || ref_size == 0 ||
152
head_size < src_repo->myRepoBlobHeadSize + ref_count * ref_size ||
153
!VALID_BLOB_STATUS(status)) {
154
/* Can't be true. Assume this is garbage! */
159
if (IN_USE_BLOB_STATUS(status)) {
160
head->setLength(head_size);
161
if (src_file->read(head->getBuffer(0), src_offset, head_size, 0) != head_size) {
169
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
170
for (int count = 0; count < ref_count; count++) {
171
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
172
case MS_BLOB_FREE_REF:
174
case MS_BLOB_TABLE_REF:
175
/* Check the reference: */
176
tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
177
blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
179
otab = MSTableList::getOpenTableByID(iCompactorDatabase->myDatabaseID, tab_id);
182
/* Ignore the return value (it will fail because auth_code is wrong!)!! */
183
uint32_t auth_code = 0;
184
otab->getDBTable()->readBlobHandle(otab, blob_id, &auth_code, &repo_id, &repo_offset, &repo_blob_size, &repo_head_size, false);
186
if (repo_id == src_repo_id &&
187
repo_offset == src_offset &&
188
repo_blob_size == blob_data_size &&
189
repo_head_size == head_size)
192
/* Remove the reference: */
193
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
196
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
198
case MS_BLOB_DELETE_REF:
199
/* These are temporary references from the TempLog file. */
200
/* We try to prevent this from happening, but it can! */
201
uint32_t temp_log_id;
202
uint32_t temp_log_offset;
203
MSTempLogFile *temp_log;
205
temp_log_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);
206
temp_log_offset = CS_GET_DISK_4(ptr.rp_temp_ref->tp_offset_4);
207
if ((temp_log = iCompactorDatabase->openTempLogFile(temp_log_id, NULL, NULL))) {
208
MSTempLogItemRec log_item;
213
if (temp_log->read(&log_item, temp_log_offset, sizeof(MSTempLogItemRec), 0) == sizeof(MSTempLogItemRec)) {
214
then = CS_GET_DISK_4(log_item.ti_time_4);
216
if (now < then + MSTempLog::gTempBlobTimeout) {
217
/* Wait for the BLOB to expire before we continue: */
221
/* Go to sleep until the problem has gone away! */
223
suspendedWait(MSTempLog::adjustWaitTime(then, now));
231
/* Remove the temp reference: */
232
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
235
tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
236
if (tab_index > ref_count || !tab_index) {
237
/* Can't be true. Assume this is garbage! */
245
ptr.rp_chars += ref_size;
248
if (table_ref_count && blob_ref_count) {
249
/* Check the blob references again to make sure that they
250
* refer to valid table references.
252
MSRepoTableRefPtr tab_ref;
255
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
256
for (int count = 0; count < ref_count; count++) {
257
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
258
case MS_BLOB_FREE_REF:
259
case MS_BLOB_TABLE_REF:
260
case MS_BLOB_DELETE_REF:
262
default: // If it isn't one of the above we assume it is an blob ref. (er_table_2 can never have a value equal to one of the above REF type flags.)
263
// It was already verified above that the index was with in range.
264
tab_ref = (MSRepoTableRefPtr) (head->getBuffer(0) + src_repo->myRepoBlobHeadSize + (CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1) * ref_size);
265
if (CS_GET_DISK_2(tab_ref->rr_type_2) == MS_BLOB_TABLE_REF)
269
ptr.rp_chars += ref_size;
273
if (blob_ref_count) {
276
dst_offset = dst_repo->myRepoFileSize;
278
/* Write the header. */
279
dst_file->write(head->getBuffer(0), dst_offset, head_size);
281
/* We have an engine reference, copy the BLOB over: */
282
CSFile::transfer(dst_file, dst_offset + head_size, src_file, src_offset + head_size, blob_size, iCompactBuffer, MS_COMPACTOR_BUFFER_SIZE);
284
#ifdef HAVE_ALIAS_SUPPORT
285
/* If the BLOB has an alias update the alias index. */
286
if (CS_GET_DISK_2(blob.rb_alias_offset_2)) {
287
iCompactorDatabase->moveBlobAlias( src_repo_id, src_offset, CS_GET_DISK_4(blob.rb_alias_hash_4), dst_repo->myRepoID, dst_offset);
290
/* Update the references: */
291
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
292
for (int count = 0; count < ref_count; count++) {
293
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
294
case MS_BLOB_FREE_REF:
296
case MS_BLOB_TABLE_REF:
297
tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
298
blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
300
if ((otab = MSTableList::getOpenTableByID(iCompactorDatabase->myDatabaseID, tab_id))) {
302
otab->getDBTable()->updateBlobHandle(otab, blob_id, dst_repo->myRepoID, dst_offset, 0);
306
case MS_BLOB_DELETE_REF:
311
ptr.rp_chars += ref_size;
314
dst_repo->myRepoFileSize += head_size + blob_size;
319
src_offset += head_size + blob_size;
322
src_repo->mustBeDeleted = true;
328
backtopool_(dst_repo);
330
backtopool_(src_repo);
333
iCompactorDatabase->removeRepo(src_repo_id, &myMustQuit);
340
void *MSCompactorThread::finalize()