1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27
* The backup is done by creating a new database with the same name and ID in the
28
* backup location. Then the pbms_dump table in the source database is initialized
29
* for a sequential scan for backup. This has the effect of locking all current repository
30
* files. Then the equvalent of 'insert into dst_db.pbms_dump (select * from src_db.pbms_dump);'
36
#include <drizzled/server_includes.h>
43
#include "CSStrUtil.h"
44
#include "CSStorage.h"
47
#include "SystemTable_ms.h"
48
#include "OpenTable_ms.h"
50
#include "Database_ms.h"
51
#include "Repository_ms.h"
52
#include "backup_ms.h"
53
#include "backup_ms.h"
54
#include "Transaction_ms.h"
55
#include "SysTab_variable.h"
56
#include "SysTab_backup.h"
58
uint32_t MSBackupInfo::gMaxInfoRef;
59
CSSyncSparseArray *MSBackupInfo::gBackupInfo;
61
//==========================================
62
MSBackupInfo::MSBackupInfo( uint32_t id,
69
uint32_t cloudRef_arg,
70
uint32_t cloudBackupNo_arg ):
79
cloudRef(cloudRef_arg),
80
cloudBackupNo(cloudBackupNo_arg)
82
db_name = CSString::newString(name);
83
if (location && *location)
84
backupLocation = CSString::newString(location);
87
//-------------------------------
88
MSBackupInfo::~MSBackupInfo()
94
backupLocation->release();
97
//-------------------------------
98
void MSBackupInfo::startBackup(MSDatabase *pbms_db)
105
src_db = MSDatabase::getDatabase(db_id);
108
startTime = time(NULL);
110
src_db->startBackup(RETAIN(this));
116
MSBackupTable::saveTable(pbms_db);
120
//-------------------------------
121
MSBackupInfo *MSBackupInfo::startDump(MSDatabase *db, uint32_t cloud_ref, uint32_t backup_no)
130
ref_id = gMaxInfoRef++;
131
new_(info, MSBackupInfo(ref_id, db->myDatabaseName->getCString(), db->myDatabaseID, time(NULL), 0, true, NULL, cloud_ref, backup_no));
134
gBackupInfo->set(ref_id, RETAIN(info));
136
info->isRunning = true;
139
unlock_(gBackupInfo);
143
MSBackupTable::saveTable(db);
146
gBackupInfo->remove(ref_id);
153
//-------------------------------
154
void MSBackupInfo::backupCompleted(MSDatabase *db)
156
completionTime = time(NULL);
158
MSBackupTable::saveTable(db);
161
//-------------------------------
162
void MSBackupInfo::backupTerminated(MSDatabase *db)
168
gBackupInfo->remove(backupRefId);
169
unlock_(gBackupInfo);
172
MSBackupTable::saveTable(db);
176
//==========================================
177
MSBackup::MSBackup():
179
bu_SourceDatabase(NULL),
185
bu_BackupRunning(false),
190
MSBackup::~MSBackup()
193
if (bu_SourceDatabase || bu_BackupList || bu_Compactor || bu_info) {
194
// We shouldn't be here
195
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "MSBackup::completeBackup() not called");
196
if (bu_SourceDatabase)
197
bu_SourceDatabase->release();
200
bu_BackupList->release();
203
bu_Compactor->release();
211
MSBackup *MSBackup::newMSBackup(MSBackupInfo *info)
218
new_(bu, MSBackup());
220
bu->bu_Database = MSDatabase::getBackupDatabase(RETAIN(info->backupLocation), RETAIN(info->db_name), info->db_id, true);
229
void MSBackup::startBackup(MSDatabase *src_db)
231
CSSyncVector *repo_list;
232
bool compacting = false;
237
bu_SourceDatabase = src_db;
238
repo_list = bu_SourceDatabase->getRepositoryList();
239
// Suspend the compactor before locking the list.
240
bu_Compactor = bu_SourceDatabase->getCompactorThread();
242
bu_Compactor->retain();
243
bu_Compactor->suspend();
246
// Build the list of repositories to be backed up.
249
new_(bu_BackupList, CSVector(repo_list->size()));
250
for (u_int i = 0; i<repo_list->size(); i++) {
251
if ((repo = (MSRepository *) repo_list->get(i))) {
252
if (!repo->isRemovingFP && !repo->mustBeDeleted) {
253
bu_BackupList->add(RETAIN(repo));
254
if (repo->initBackup() == REPO_COMPACTING)
257
if (!repo->myRepoHeadSize) {
258
/* The file has not yet been opened, so the
259
* garbage count will not be known!
261
MSRepoFile *repo_file;
264
//unlock_(myRepostoryList);
266
repo_file = repo->openRepoFile();
267
repo_file->release();
269
//lock_(myRepostoryList);
273
bu_size += repo->myRepoFileSize;
279
// Copy the table list to the backup database:
280
uint32_t next_tab = 0;
282
while ((tab = bu_SourceDatabase->getNextTable(&next_tab))) {
284
bu_Database->addTable(tab->myTableID, tab->myTableName->getCString(), 0, false);
289
// Copy over any physical PBMS system tables.
290
pbms_transfer_ststem_tables(RETAIN(bu_Database), RETAIN(bu_SourceDatabase));
292
// Load the system tables into the backup database. This will
293
// initialize the database for cloud storage if required.
294
pbms_load_system_tables(RETAIN(bu_Database));
296
// Set the cloud backup info.
297
bu_Database->myBlobCloud->cl_setBackupInfo(RETAIN(bu_info));
300
// Set the backup number in the pbms_variable tabe. (This is a hidden value.)
301
// This value is used in case a drag and drop restore was done. When a data base is
302
// first loaded this value is checked and if it is not zero then the backup record
303
// will be read and any used to recover any BLOBs.
306
snprintf(value, 20, "%"PRIu32"", bu_info->getBackupRefId());
307
MSVariableTable::setVariable(RETAIN(bu_Database), BACKUP_NUMBER_VAR, value);
309
// Once the repositories are locked the compactor can be restarted
310
// unless it is in the process of compacting a repository that is
312
if (bu_Compactor && !compacting) {
313
bu_Compactor->resume();
314
bu_Compactor->release();
318
// Suspend the transaction writer while the backup is running.
319
MSTransactionManager::suspend(true);
320
bu_TransactionManagerSuspended = true;
322
// Start the backup daemon thread.
323
bu_ID = bu_start_time = time(NULL);
336
void MSBackup::completeBackup()
338
if (bu_TransactionManagerSuspended) {
339
MSTransactionManager::resume();
340
bu_TransactionManagerSuspended = false;
346
while (bu_BackupList->size()) {
347
repo = (MSRepository *) bu_BackupList->take(0);
349
repo->backupCompleted();
353
bu_BackupList->release();
354
bu_BackupList = NULL;
358
bu_Compactor->resume();
359
bu_Compactor->release();
364
if (bu_State == BU_COMPLETED)
365
bu_Database->releaseBackupDatabase();
367
MSDatabase::dropDatabase(bu_Database);
372
if (bu_SourceDatabase){
373
if (bu_State == BU_COMPLETED)
374
bu_info->backupCompleted(bu_SourceDatabase);
376
bu_info->backupTerminated(bu_SourceDatabase);
378
bu_SourceDatabase = NULL;
383
bu_BackupRunning = false;
386
bool MSBackup::doWork()
389
MSRepository *src_repo, *dst_repo;
390
MSRepoFile *src_file, *dst_file;
391
off_t src_offset, prev_offset;
393
uint64_t blob_size, blob_data_size;
394
CSStringBuffer *head;
395
MSRepoPointersRec ptr;
396
u_int table_ref_count;
397
u_int blob_ref_count;
404
uint32_t src_repo_id;
406
uint8_t blob_storage_type;
409
char transferBuffer[MS_BACKUP_BUFFER_SIZE];
410
CloudKeyRec cloud_key;
414
bu_BackupRunning = true;
415
bu_State = BU_RUNNING;
421
myWaitTime = 5 * 1000; // Time in milli-seconds
428
new_(head, CSStringBuffer(100));
431
src_repo = (MSRepository*)bu_BackupList->get(0);
432
while (src_repo && !myMustQuit) {
434
src_file = src_repo->openRepoFile();
437
dst_repo = bu_Database->lockRepo(src_repo->myRepoFileSize - src_repo->myGarbageCount);
439
dst_file = dst_repo->openRepoFile();
442
src_repo_id = src_repo->myRepoID;
443
src_offset = src_repo->myRepoHeadSize;
445
while (src_offset < src_repo->myRepoFileSize) {
448
bu_completed += src_offset - prev_offset;
449
prev_offset = src_offset;
455
// A lock is required here because references and dereferences to the
456
// BLOBs can result in the repository record being updated while
457
// it is being copied.
458
lock = &src_repo->myRepoLock[src_offset % CS_REPO_REC_LOCK_COUNT];
460
head->setLength(src_repo->myRepoBlobHeadSize);
461
if (src_file->read(head->getBuffer(0), src_offset, src_repo->myRepoBlobHeadSize, 0) < src_repo->myRepoBlobHeadSize) {
466
ptr.rp_chars = head->getBuffer(0);
467
ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
468
ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
469
head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
470
blob_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_repo_size_6);
471
blob_data_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_data_size_6);
472
auth_code = CS_GET_DISK_4(ptr.rp_head->rb_auth_code_4);
473
status = CS_GET_DISK_1(ptr.rp_head->rb_status_1);
474
mod_time = CS_GET_DISK_4(ptr.rp_head->rb_mod_time_4);
476
blob_storage_type = CS_GET_DISK_1(ptr.rp_head->rb_storage_type_1);
477
if (blob_storage_type == MS_CLOUD_STORAGE) {
478
MSRepoFile::getBlobKey(ptr.rp_head, &cloud_key);
481
// If the BLOB was modified after the start of the backup
482
// then set the mod time to the backup time to ensure that
483
// a backup for update will work correctly.
484
if (mod_time > bu_start_time)
485
CS_SET_DISK_4(ptr.rp_head->rb_mod_time_4, bu_start_time);
487
// If the BLOB was moved during the time of this backup then copy
488
// it to the backup location as a referenced BLOB.
489
if ((status == MS_BLOB_MOVED) && (bu_ID == (uint32_t) CS_GET_DISK_4(ptr.rp_head->rb_backup_id_4))) {
490
status = MS_BLOB_REFERENCED;
491
CS_SET_DISK_1(ptr.rp_head->rb_status_1, status);
495
if ((blob_data_size == 0) || ref_count <= 0 || ref_size == 0 ||
496
head_size < src_repo->myRepoBlobHeadSize + ref_count * ref_size ||
497
!VALID_BLOB_STATUS(status)) {
498
/* Can't be true. Assume this is garbage! */
505
if ((status == MS_BLOB_REFERENCED) || (status == MS_BLOB_MOVED)) {
506
head->setLength(head_size);
507
if (src_file->read(head->getBuffer(0) + src_repo->myRepoBlobHeadSize, src_offset + src_repo->myRepoBlobHeadSize, head_size - src_repo->myRepoBlobHeadSize, 0) != (head_size- src_repo->myRepoBlobHeadSize)) {
515
// Loop through all the references removing temporary references
516
// and counting table and blob references.
518
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
519
for (int count = 0; count < ref_count; count++) {
520
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
521
case MS_BLOB_FREE_REF:
523
case MS_BLOB_TABLE_REF:
524
// Unlike the compactor, table refs are not checked because
525
// they do not yet exist in the backup database.
528
case MS_BLOB_DELETE_REF:
529
// These are temporary references from the TempLog file.
530
// They are not copied to the backup.
531
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
534
// Must be a BLOB reference
536
tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
537
if (tab_index && (tab_index <= ref_count)) {
538
// Only committed references are backed up.
539
if (IS_COMMITTED(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8))) {
540
MSRepoTableRefPtr tab_ref;
541
tab_ref = (MSRepoTableRefPtr) (head->getBuffer(0) + src_repo->myRepoBlobHeadSize + (tab_index-1) * ref_size);
542
if (CS_GET_DISK_2(tab_ref->rr_type_2) == MS_BLOB_TABLE_REF)
545
CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
549
/* Can't be true. Assume this is garbage! */
556
ptr.rp_chars += ref_size;
560
// If there are still blob references then the record needs to be backed up.
561
if (table_ref_count && blob_ref_count) {
565
dst_offset = dst_repo->myRepoFileSize;
567
/* Write the header. */
568
dst_file->write(head->getBuffer(0), dst_offset, head_size);
570
/* Copy the BLOB over: */
571
if (blob_storage_type == MS_CLOUD_STORAGE) {
572
bu_Database->myBlobCloud->cl_backupBLOB(&cloud_key);
574
CSFile::transfer(dst_file, dst_offset + head_size, src_file, src_offset + head_size, blob_size, transferBuffer, MS_BACKUP_BUFFER_SIZE);
576
/* Update the references: */
577
ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
578
for (int count = 0; count < ref_count; count++) {
579
switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
580
case MS_BLOB_FREE_REF:
581
case MS_BLOB_DELETE_REF:
583
case MS_BLOB_TABLE_REF:
584
tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
585
blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
587
if ((otab = MSTableList::getOpenTableByID(bu_Database->myDatabaseID, tab_id))) {
589
otab->getDBTable()->setBlobHandle(otab, blob_id, dst_repo->myRepoID, dst_offset, blob_size, head_size, auth_code);
590
//CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "What if an error ocurred here!");
598
ptr.rp_chars += ref_size;
601
dst_repo->myRepoFileSize += head_size + blob_size;
605
src_offset += head_size + blob_size;
607
bu_completed += src_offset - prev_offset;
609
// close the destination repository and cleanup.
611
backtopool_(dst_repo);
614
// release the source repository and get the next one in the list.
615
src_repo->backupCompleted();
616
bu_BackupList->remove(0);
618
src_repo = (MSRepository*)bu_BackupList->get(0);
623
bu_State = BU_TERMINATED;
625
bu_State = BU_COMPLETED;
639
void *MSBackup::finalize()