1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
30
#include "cslib/CSConfig.h"
32
#include "cslib/CSGlobal.h"
33
#include "cslib/CSStrUtil.h"
34
#include "cslib/CSStorage.h"
37
#include "compactor_ms.h"
38
#include "open_table_ms.h"
39
#include "repository_ms.h"
40
#include "parameters_ms.h"
43
* ---------------------------------------------------------------
47
MSCompactorThread::MSCompactorThread(time_t wait_time, MSDatabase *db):
48
CSDaemon(wait_time, NULL),
49
iCompactorDatabase(db)
53
void MSCompactorThread::close()
57
bool MSCompactorThread::doWork()
60
MSRepository *src_repo, *dst_repo;
61
MSRepoFile *src_file, *dst_file;
66
uint64_t blob_size, blob_data_size;
68
MSRepoPointersRec ptr;
69
uint32_t table_ref_count;
70
uint32_t blob_ref_count;
79
uint64_t repo_blob_size;
80
uint16_t repo_head_size;
87
#ifdef MS_COMPACTOR_POLLS
88
if (!(src_repo = iCompactorDatabase->getRepoFullOfTrash(NULL)))
91
myWaitTime = MS_DEFAULT_COMPACTOR_WAIT * 1000; // Time in milli-seconds
92
if (!(src_repo = iCompactorDatabase->getRepoFullOfTrash(&myWaitTime)))
96
src_file = src_repo->openRepoFile();
99
dst_repo = iCompactorDatabase->lockRepo(src_repo->myRepoFileSize - src_repo->myGarbageCount);
101
dst_file = dst_repo->openRepoFile();
104
new_(head, CSStringBuffer(100));
108
src_repo_id = src_repo->myRepoID;
109
src_offset = src_repo->myRepoHeadSize;
110
//printf("\nCompacting repo %"PRId32"\n\n", src_repo_id);
117
backtopool_(dst_repo);
119
backtopool_(src_repo);
121
myWaitTime = 5 * 1000; // Time in milli-seconds
125
while (src_offset < src_repo->myRepoFileSize) {
133
// A lock is required here because references and dereferences to the
134
// BLOBs can result in the repository record being updated while
135
// it is being copied.
136
mylock = &src_repo->myRepoLock[src_offset % CS_REPO_REC_LOCK_COUNT];
138
if (src_file->read(&blob, src_offset, src_repo->myRepoBlobHeadSize, 0) < src_repo->myRepoBlobHeadSize) {
142
ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
143
ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
144
head_size = CS_GET_DISK_2(blob.rb_head_size_2);
145
blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
146
blob_data_size = CS_GET_DISK_6(blob.rb_blob_data_size_6);
147
status = CS_GET_DISK_1(blob.rb_status_1);
148
if ((blob_data_size == 0) || ref_count <= 0 || ref_size == 0 ||
149
head_size < src_repo->myRepoBlobHeadSize + ref_count * ref_size ||
150
!VALID_BLOB_STATUS(status)) {
151
/* Can't be true. Assume this is garbage! */
156
if (IN_USE_BLOB_STATUS(status)) {
157
head->setLength(head_size);
158
if (src_file->read(head->getBuffer(0), src_offset, head_size, 0) != head_size) {
166
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
167
for (int count = 0; count < ref_count; count++) {
168
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
169
case MS_BLOB_FREE_REF:
171
case MS_BLOB_TABLE_REF:
172
/* Check the reference: */
173
tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
174
blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
176
otab = MSTableList::getOpenTableByID(iCompactorDatabase->myDatabaseID, tab_id);
179
/* Ignore the return value (it will fail because auth_code is wrong!)!! */
180
uint32_t auth_code = 0;
181
otab->getDBTable()->readBlobHandle(otab, blob_id, &auth_code, &repo_id, &repo_offset, &repo_blob_size, &repo_head_size, false);
183
if (repo_id == src_repo_id &&
184
repo_offset == src_offset &&
185
repo_blob_size == blob_data_size &&
186
repo_head_size == head_size)
189
/* Remove the reference: */
190
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
193
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
195
case MS_BLOB_DELETE_REF:
196
/* These are temporary references from the TempLog file. */
197
/* We try to prevent this from happening, but it can! */
198
uint32_t temp_log_id;
199
uint32_t temp_log_offset;
200
MSTempLogFile *temp_log;
202
temp_log_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);
203
temp_log_offset = CS_GET_DISK_4(ptr.rp_temp_ref->tp_offset_4);
204
if ((temp_log = iCompactorDatabase->openTempLogFile(temp_log_id, NULL, NULL))) {
205
MSTempLogItemRec log_item;
210
if (temp_log->read(&log_item, temp_log_offset, sizeof(MSTempLogItemRec), 0) == sizeof(MSTempLogItemRec)) {
211
then = CS_GET_DISK_4(log_item.ti_time_4);
213
if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
214
/* Wait for the BLOB to expire before we continue: */
218
/* Go to sleep until the problem has gone away! */
220
suspendedWait(MSTempLog::adjustWaitTime(then, now));
228
/* Remove the temp reference: */
229
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
232
tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
233
if (tab_index > ref_count || !tab_index) {
234
/* Can't be true. Assume this is garbage! */
242
ptr.rp_chars += ref_size;
245
if (table_ref_count && blob_ref_count) {
246
/* Check the blob references again to make sure that they
247
* refer to valid table references.
249
MSRepoTableRefPtr tab_ref;
252
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
253
for (int count = 0; count < ref_count; count++) {
254
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
255
case MS_BLOB_FREE_REF:
256
case MS_BLOB_TABLE_REF:
257
case MS_BLOB_DELETE_REF:
259
default: // If it isn't one of the above we assume it is an blob ref. (er_table_2 can never have a value equal to one of the above REF type flags.)
260
// It was already verified above that the index was with in range.
261
tab_ref = (MSRepoTableRefPtr) (head->getBuffer(0) + src_repo->myRepoBlobHeadSize + (CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1) * ref_size);
262
if (CS_GET_DISK_2(tab_ref->rr_type_2) == MS_BLOB_TABLE_REF)
266
ptr.rp_chars += ref_size;
270
if (blob_ref_count) {
273
dst_offset = dst_repo->myRepoFileSize;
275
/* Write the header. */
276
dst_file->write(head->getBuffer(0), dst_offset, head_size);
278
/* We have an engine reference, copy the BLOB over: */
279
CSFile::transfer(dst_file, dst_offset + head_size, src_file, src_offset + head_size, blob_size, iCompactBuffer, MS_COMPACTOR_BUFFER_SIZE);
281
#ifdef HAVE_ALIAS_SUPPORT
282
/* If the BLOB has an alias update the alias index. */
283
if (CS_GET_DISK_2(blob.rb_alias_offset_2)) {
284
iCompactorDatabase->moveBlobAlias( src_repo_id, src_offset, CS_GET_DISK_4(blob.rb_alias_hash_4), dst_repo->myRepoID, dst_offset);
287
/* Update the references: */
288
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
289
for (int count = 0; count < ref_count; count++) {
290
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
291
case MS_BLOB_FREE_REF:
293
case MS_BLOB_TABLE_REF:
294
tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
295
blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
297
if ((otab = MSTableList::getOpenTableByID(iCompactorDatabase->myDatabaseID, tab_id))) {
299
otab->getDBTable()->updateBlobHandle(otab, blob_id, dst_repo->myRepoID, dst_offset, 0);
303
case MS_BLOB_DELETE_REF:
308
ptr.rp_chars += ref_size;
311
dst_repo->myRepoFileSize += head_size + blob_size;
316
src_offset += head_size + blob_size;
319
src_repo->mustBeDeleted = true;
325
backtopool_(dst_repo);
327
backtopool_(src_repo);
330
iCompactorDatabase->removeRepo(src_repo_id, &myMustQuit);
337
void *MSCompactorThread::completeWork()