1
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
27
* The backup is done by creating a new database with the same name and ID in the
28
* backup location. Then the pbms_dump table in the source database is initialized
29
* for a sequential scan for backup. This has the effect of locking all current repository
30
* files. Then the equvalent of 'insert into dst_db.pbms_dump (select * from src_db.pbms_dump);'
37
#include <drizzled/common.h>
38
#include <drizzled/session.h>
39
#include <drizzled/table.h>
40
#include <drizzled/message/table.pb.h>
41
#include <drizzled/charset.h>
42
#include <drizzled/table_proto.h>
43
#include <drizzled/field.h>
44
#include <drizzled/field/varstring.h>
47
#include "cslib/CSConfig.h"
49
#include <sys/types.h>
52
#include "cslib/CSGlobal.h"
53
#include "cslib/CSStrUtil.h"
54
#include "cslib/CSStorage.h"
57
#include "system_table_ms.h"
58
#include "open_table_ms.h"
60
#include "database_ms.h"
61
#include "repository_ms.h"
62
#include "backup_ms.h"
63
#include "transaction_ms.h"
64
#include "systab_variable_ms.h"
65
#include "systab_backup_ms.h"
67
uint32_t MSBackupInfo::gMaxInfoRef;
68
CSSyncSparseArray *MSBackupInfo::gBackupInfo;
70
//==========================================
71
MSBackupInfo::MSBackupInfo( uint32_t id,
78
uint32_t cloudRef_arg,
79
uint32_t cloudBackupNo_arg ):
88
cloudRef(cloudRef_arg),
89
cloudBackupNo(cloudBackupNo_arg)
91
db_name = CSString::newString(name);
92
if (location && *location)
93
backupLocation = CSString::newString(location);
96
//-------------------------------
97
MSBackupInfo::~MSBackupInfo()
103
backupLocation->release();
106
//-------------------------------
107
void MSBackupInfo::startBackup(MSDatabase *pbms_db)
114
src_db = MSDatabase::getDatabase(db_id);
117
startTime = time(NULL);
119
src_db->startBackup(RETAIN(this));
125
MSBackupTable::saveTable(pbms_db);
129
//-------------------------------
130
class StartDumpCleanUp : public CSRefObject {
136
StartDumpCleanUp(): CSRefObject(),
142
MSBackupInfo::gBackupInfo->remove(ref_id);
146
void setCleanUp(uint32_t id)
159
MSBackupInfo *MSBackupInfo::startDump(MSDatabase *db, uint32_t cloud_ref, uint32_t backup_no)
163
StartDumpCleanUp *cleanup;
169
ref_id = gMaxInfoRef++;
170
new_(info, MSBackupInfo(ref_id, db->myDatabaseName->getCString(), db->myDatabaseID, time(NULL), 0, true, NULL, cloud_ref, backup_no));
173
gBackupInfo->set(ref_id, RETAIN(info));
175
info->isRunning = true;
178
unlock_(gBackupInfo);
181
// Create a cleanup object to handle cleanup
182
// after a possible exception.
183
new_(cleanup, StartDumpCleanUp());
185
cleanup->setCleanUp(ref_id);
187
MSBackupTable::saveTable(RETAIN(db));
189
cleanup->cancelCleanUp();
197
//-------------------------------
198
void MSBackupInfo::backupCompleted(MSDatabase *db)
200
completionTime = time(NULL);
202
MSBackupTable::saveTable(db);
205
//-------------------------------
206
void MSBackupInfo::backupTerminated(MSDatabase *db)
212
gBackupInfo->remove(backupRefId);
213
unlock_(gBackupInfo);
216
MSBackupTable::saveTable(db);
220
//==========================================
221
MSBackup::MSBackup():
226
bu_BackupRunning(false),
227
bu_State(BU_COMPLETED),
228
bu_SourceDatabase(NULL),
236
bu_TransactionManagerSuspended(false)
240
MSBackup *MSBackup::newMSBackup(MSBackupInfo *info)
247
new_(bu, MSBackup());
249
bu->bu_Database = MSDatabase::getBackupDatabase(RETAIN(info->backupLocation), RETAIN(info->db_name), info->db_id, true);
258
//-------------------------------
259
class StartBackupCleanUp : public CSRefObject {
265
StartBackupCleanUp(): CSRefObject(),
268
~StartBackupCleanUp()
271
backup->completeBackup();
275
void setCleanUp(MSBackup *bup)
288
void MSBackup::startBackup(MSDatabase *src_db)
290
CSSyncVector *repo_list;
291
bool compacting = false;
293
StartBackupCleanUp *cleanup;
296
// Create a cleanup object to handle cleanup
297
// after a possible exception.
298
new_(cleanup, StartBackupCleanUp());
300
cleanup->setCleanUp(this);
302
bu_SourceDatabase = src_db;
303
repo_list = bu_SourceDatabase->getRepositoryList();
304
// Suspend the compactor before locking the list.
305
bu_Compactor = bu_SourceDatabase->getCompactorThread();
307
bu_Compactor->retain();
308
bu_Compactor->suspend();
311
// Build the list of repositories to be backed up.
314
new_(bu_BackupList, CSVector(repo_list->size()));
315
for (uint32_t i = 0; i<repo_list->size(); i++) {
316
if ((repo = (MSRepository *) repo_list->get(i))) {
317
if (!repo->isRemovingFP && !repo->mustBeDeleted) {
318
bu_BackupList->add(RETAIN(repo));
319
if (repo->initBackup() == REPO_COMPACTING)
322
if (!repo->myRepoHeadSize) {
323
/* The file has not yet been opened, so the
324
* garbage count will not be known!
326
MSRepoFile *repo_file;
329
//unlock_(myRepostoryList);
331
repo_file = repo->openRepoFile();
332
repo_file->release();
334
//lock_(myRepostoryList);
338
bu_size += repo->myRepoFileSize;
344
// Copy the table list to the backup database:
345
uint32_t next_tab = 0;
347
while ((tab = bu_SourceDatabase->getNextTable(&next_tab))) {
349
bu_Database->addTable(tab->myTableID, tab->myTableName->getCString(), 0, false);
354
// Copy over any physical PBMS system tables.
355
PBMSSystemTables::transferSystemTables(RETAIN(bu_Database), RETAIN(bu_SourceDatabase));
357
// Load the system tables into the backup database. This will
358
// initialize the database for cloud storage if required.
359
PBMSSystemTables::loadSystemTables(RETAIN(bu_Database));
361
// Set the cloud backup info.
362
bu_Database->myBlobCloud->cl_setBackupInfo(RETAIN(bu_info));
365
// Set the backup number in the pbms_variable tabe. (This is a hidden value.)
366
// This value is used in case a drag and drop restore was done. When a data base is
367
// first loaded this value is checked and if it is not zero then the backup record
368
// will be read and any used to recover any BLOBs.
371
snprintf(value, 20, "%"PRIu32"", bu_info->getBackupRefId());
372
MSVariableTable::setVariable(RETAIN(bu_Database), BACKUP_NUMBER_VAR, value);
374
// Once the repositories are locked the compactor can be restarted
375
// unless it is in the process of compacting a repository that is
377
if (bu_Compactor && !compacting) {
378
bu_Compactor->resume();
379
bu_Compactor->release();
383
// Suspend the transaction writer while the backup is running.
384
MSTransactionManager::suspend(true);
385
bu_TransactionManagerSuspended = true;
387
// Start the backup daemon thread.
388
bu_ID = bu_start_time = time(NULL);
391
cleanup->cancelCleanUp();
397
void MSBackup::completeBackup()
399
if (bu_TransactionManagerSuspended) {
400
MSTransactionManager::resume();
401
bu_TransactionManagerSuspended = false;
407
while (bu_BackupList->size()) {
408
repo = (MSRepository *) bu_BackupList->take(0);
410
repo->backupCompleted();
414
bu_BackupList->release();
415
bu_BackupList = NULL;
419
bu_Compactor->resume();
420
bu_Compactor->release();
425
if (bu_State == BU_COMPLETED)
426
bu_Database->releaseBackupDatabase();
428
MSDatabase::dropDatabase(bu_Database);
433
if (bu_SourceDatabase){
434
if (bu_State == BU_COMPLETED)
435
bu_info->backupCompleted(bu_SourceDatabase);
437
bu_info->backupTerminated(bu_SourceDatabase);
439
bu_SourceDatabase = NULL;
444
bu_BackupRunning = false;
447
bool MSBackup::doWork()
452
MSRepository *src_repo, *dst_repo;
453
MSRepoFile *src_file, *dst_file;
454
off64_t src_offset, prev_offset;
456
uint64_t blob_size, blob_data_size;
457
CSStringBuffer *head;
458
MSRepoPointersRec ptr;
459
uint32_t table_ref_count;
460
uint32_t blob_ref_count;
468
uint8_t blob_storage_type;
471
char *transferBuffer;
472
CloudKeyRec cloud_key;
475
bu_BackupRunning = true;
476
bu_State = BU_RUNNING;
482
myWaitTime = 5 * 1000; // Time in milli-seconds
488
transferBuffer = (char*) cs_malloc(MS_BACKUP_BUFFER_SIZE);
489
push_ptr_(transferBuffer);
491
new_(head, CSStringBuffer(100));
494
src_repo = (MSRepository*)bu_BackupList->get(0);
495
while (src_repo && !myMustQuit) {
497
src_file = src_repo->openRepoFile();
500
dst_repo = bu_Database->lockRepo(src_repo->myRepoFileSize - src_repo->myGarbageCount);
502
dst_file = dst_repo->openRepoFile();
505
src_offset = src_repo->myRepoHeadSize;
507
while (src_offset < src_repo->myRepoFileSize) {
510
bu_completed += src_offset - prev_offset;
511
prev_offset = src_offset;
517
// A lock is required here because references and dereferences to the
518
// BLOBs can result in the repository record being updated while
519
// it is being copied.
520
my_lock = &src_repo->myRepoLock[src_offset % CS_REPO_REC_LOCK_COUNT];
522
head->setLength(src_repo->myRepoBlobHeadSize);
523
if (src_file->read(head->getBuffer(0), src_offset, src_repo->myRepoBlobHeadSize, 0) < src_repo->myRepoBlobHeadSize) {
528
ptr.rp_chars = head->getBuffer(0);
529
ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
530
ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
531
head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
532
blob_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_repo_size_6);
533
blob_data_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_data_size_6);
534
auth_code = CS_GET_DISK_4(ptr.rp_head->rb_auth_code_4);
535
status = CS_GET_DISK_1(ptr.rp_head->rb_status_1);
536
mod_time = CS_GET_DISK_4(ptr.rp_head->rb_mod_time_4);
538
blob_storage_type = CS_GET_DISK_1(ptr.rp_head->rb_storage_type_1);
539
if (blob_storage_type == MS_CLOUD_STORAGE) {
540
MSRepoFile::getBlobKey(ptr.rp_head, &cloud_key);
543
// If the BLOB was modified after the start of the backup
544
// then set the mod time to the backup time to ensure that
545
// a backup for update will work correctly.
546
if (mod_time > bu_start_time)
547
CS_SET_DISK_4(ptr.rp_head->rb_mod_time_4, bu_start_time);
549
// If the BLOB was moved during the time of this backup then copy
550
// it to the backup location as a referenced BLOB.
551
if ((status == MS_BLOB_MOVED) && (bu_ID == (uint32_t) CS_GET_DISK_4(ptr.rp_head->rb_backup_id_4))) {
552
status = MS_BLOB_REFERENCED;
553
CS_SET_DISK_1(ptr.rp_head->rb_status_1, status);
557
if ((blob_data_size == 0) || ref_count <= 0 || ref_size == 0 ||
558
head_size < src_repo->myRepoBlobHeadSize + ref_count * ref_size ||
559
!VALID_BLOB_STATUS(status)) {
560
/* Can't be true. Assume this is garbage! */
567
if ((status == MS_BLOB_REFERENCED) || (status == MS_BLOB_MOVED)) {
568
head->setLength(head_size);
569
if (src_file->read(head->getBuffer(0) + src_repo->myRepoBlobHeadSize, src_offset + src_repo->myRepoBlobHeadSize, head_size - src_repo->myRepoBlobHeadSize, 0) != (head_size- src_repo->myRepoBlobHeadSize)) {
577
// Loop through all the references removing temporary references
578
// and counting table and blob references.
580
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
581
for (int count = 0; count < ref_count; count++) {
582
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
583
case MS_BLOB_FREE_REF:
585
case MS_BLOB_TABLE_REF:
586
// Unlike the compactor, table refs are not checked because
587
// they do not yet exist in the backup database.
590
case MS_BLOB_DELETE_REF:
591
// These are temporary references from the TempLog file.
592
// They are not copied to the backup.
593
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
596
// Must be a BLOB reference
598
tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
599
if (tab_index && (tab_index <= ref_count)) {
600
// Only committed references are backed up.
601
if (IS_COMMITTED(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8))) {
602
MSRepoTableRefPtr tab_ref;
603
tab_ref = (MSRepoTableRefPtr) (head->getBuffer(0) + src_repo->myRepoBlobHeadSize + (tab_index-1) * ref_size);
604
if (CS_GET_DISK_2(tab_ref->rr_type_2) == MS_BLOB_TABLE_REF)
607
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
611
/* Can't be true. Assume this is garbage! */
618
ptr.rp_chars += ref_size;
622
// If there are still blob references then the record needs to be backed up.
623
if (table_ref_count && blob_ref_count) {
627
dst_offset = dst_repo->myRepoFileSize;
629
/* Write the header. */
630
dst_file->write(head->getBuffer(0), dst_offset, head_size);
632
/* Copy the BLOB over: */
633
if (blob_storage_type == MS_CLOUD_STORAGE) {
634
bu_Database->myBlobCloud->cl_backupBLOB(&cloud_key);
636
CSFile::transfer(RETAIN(dst_file), dst_offset + head_size, RETAIN(src_file), src_offset + head_size, blob_size, transferBuffer, MS_BACKUP_BUFFER_SIZE);
638
/* Update the references: */
639
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
640
for (int count = 0; count < ref_count; count++) {
641
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
642
case MS_BLOB_FREE_REF:
643
case MS_BLOB_DELETE_REF:
645
case MS_BLOB_TABLE_REF:
646
tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
647
blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
649
if ((otab = MSTableList::getOpenTableByID(bu_Database->myDatabaseID, tab_id))) {
651
otab->getDBTable()->setBlobHandle(otab, blob_id, dst_repo->myRepoID, dst_offset, blob_size, head_size, auth_code);
652
//CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "What if an error ocurred here!");
660
ptr.rp_chars += ref_size;
663
dst_repo->myRepoFileSize += head_size + blob_size;
667
src_offset += head_size + blob_size;
669
bu_completed += src_offset - prev_offset;
671
// close the destination repository and cleanup.
673
backtopool_(dst_repo);
676
// release the source repository and get the next one in the list.
677
src_repo->backupCompleted();
678
bu_BackupList->remove(0);
680
src_repo = (MSRepository*)bu_BackupList->get(0);
684
release_(transferBuffer);
686
bu_State = BU_TERMINATED;
688
bu_State = BU_COMPLETED;
702
void *MSBackup::completeWork()
704
if (bu_SourceDatabase || bu_BackupList || bu_Compactor || bu_info) {
705
// We shouldn't be here
706
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "MSBackup::completeBackup() not called");
707
if (bu_SourceDatabase) {
708
bu_SourceDatabase->release();
709
bu_SourceDatabase = NULL;
713
bu_BackupList->release();
714
bu_BackupList = NULL;
719
bu_Compactor->release();