~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/backup_ms.cc

  • Committer: Mark Atwood
  • Date: 2011-12-20 02:32:53 UTC
  • mfrom: (2469.1.1 drizzle-build)
  • Revision ID: me@mark.atwood.name-20111220023253-bvu0kr14kwsdvz7g
mergeĀ lp:~brianaker/drizzle/deprecate-pbms

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Barry Leslie
20
 
 *
21
 
 * 2009-05-29
22
 
 *
23
 
 * H&G2JCtL
24
 
 *
25
 
 * Repository backup.
26
 
 *
27
 
 * The backup is done by creating a new database with the same name and ID in the 
28
 
 * backup location. Then the pbms_dump table in the source database is initialized
29
 
 * for a sequential scan for backup. This has the effect of locking all current repository
30
 
 * files. Then the equvalent of  'insert into dst_db.pbms_dump (select * from src_db.pbms_dump);'
31
 
 * is performed. 
32
 
 *
33
 
 */
34
 
 
35
 
#ifdef DRIZZLED
36
 
#include <config.h>
37
 
#include <drizzled/common.h>
38
 
#include <drizzled/session.h>
39
 
#include <drizzled/table.h>
40
 
#include <drizzled/message/table.pb.h>
41
 
#include <drizzled/charset.h>
42
 
#include <drizzled/table_proto.h>
43
 
#include <drizzled/field.h>
44
 
#include <drizzled/field/varstring.h>
45
 
#endif
46
 
 
47
 
#include "cslib/CSConfig.h"
48
 
 
49
 
#include <sys/types.h>
50
 
#include <inttypes.h>
51
 
 
52
 
#include "cslib/CSGlobal.h"
53
 
#include "cslib/CSStrUtil.h"
54
 
#include "cslib/CSStorage.h"
55
 
 
56
 
#include "defs_ms.h"
57
 
#include "system_table_ms.h"
58
 
#include "open_table_ms.h"
59
 
#include "table_ms.h"
60
 
#include "database_ms.h"
61
 
#include "repository_ms.h"
62
 
#include "backup_ms.h"
63
 
#include "transaction_ms.h"
64
 
#include "systab_variable_ms.h"
65
 
#include "systab_backup_ms.h"
66
 
 
67
 
uint32_t MSBackupInfo::gMaxInfoRef;
68
 
CSSyncSparseArray *MSBackupInfo::gBackupInfo;
69
 
 
70
 
//==========================================
71
 
MSBackupInfo::MSBackupInfo(     uint32_t id, 
72
 
                                                        const char *name, 
73
 
                                                        uint32_t db_id_arg, 
74
 
                                                        time_t start, 
75
 
                                                        time_t end, 
76
 
                                                        bool _isDump, 
77
 
                                                        const char *location, 
78
 
                                                        uint32_t cloudRef_arg, 
79
 
                                                        uint32_t cloudBackupNo_arg ):
80
 
        backupRefId(id),
81
 
        db_name(NULL),
82
 
        db_id(db_id_arg),
83
 
        startTime(start),
84
 
        completionTime(end),
85
 
        dump(_isDump),
86
 
        isRunning(false),
87
 
        backupLocation(NULL),
88
 
        cloudRef(cloudRef_arg),
89
 
        cloudBackupNo(cloudBackupNo_arg)
90
 
{
91
 
        db_name = CSString::newString(name);
92
 
        if (location && *location)              
93
 
                backupLocation = CSString::newString(location);         
94
 
}
95
 
 
96
 
//-------------------------------
97
 
MSBackupInfo::~MSBackupInfo()
98
 
{
99
 
        if (db_name)
100
 
                db_name->release();
101
 
        
102
 
        if (backupLocation)
103
 
                backupLocation->release();
104
 
}
105
 
 
106
 
//-------------------------------
107
 
void MSBackupInfo::startBackup(MSDatabase *pbms_db)
108
 
{
109
 
        MSDatabase *src_db;
110
 
        
111
 
        enter_();
112
 
        push_(pbms_db);
113
 
        
114
 
        src_db = MSDatabase::getDatabase(db_id);
115
 
        push_(src_db);
116
 
        
117
 
        startTime = time(NULL);
118
 
        
119
 
        src_db->startBackup(RETAIN(this));
120
 
        release_(src_db);
121
 
        
122
 
        isRunning = true;
123
 
        
124
 
        pop_(pbms_db);
125
 
        MSBackupTable::saveTable(pbms_db);
126
 
        exit_();
127
 
}
128
 
 
129
 
//-------------------------------
130
 
class StartDumpCleanUp : public CSRefObject {
131
 
        bool do_cleanup;
132
 
        uint32_t ref_id;
133
 
 
134
 
        public:
135
 
        
136
 
        StartDumpCleanUp(): CSRefObject(),
137
 
                do_cleanup(false){}
138
 
                
139
 
        ~StartDumpCleanUp() 
140
 
        {
141
 
                if (do_cleanup) {
142
 
                        MSBackupInfo::gBackupInfo->remove(ref_id);
143
 
                }
144
 
        }
145
 
        
146
 
        void setCleanUp(uint32_t id)
147
 
        {
148
 
                ref_id = id;
149
 
                do_cleanup = true;
150
 
        }
151
 
        
152
 
        void cancelCleanUp()
153
 
        {
154
 
                do_cleanup = false;
155
 
        }
156
 
        
157
 
};
158
 
 
159
 
MSBackupInfo *MSBackupInfo::startDump(MSDatabase *db, uint32_t cloud_ref, uint32_t backup_no)
160
 
{
161
 
        MSBackupInfo *info;
162
 
        uint32_t ref_id;
163
 
        StartDumpCleanUp *cleanup;
164
 
        
165
 
        enter_();
166
 
        push_(db);
167
 
        lock_(gBackupInfo);
168
 
        
169
 
        ref_id = gMaxInfoRef++;
170
 
        new_(info, MSBackupInfo(ref_id, db->myDatabaseName->getCString(), db->myDatabaseID, time(NULL), 0, true, NULL, cloud_ref, backup_no));
171
 
        push_(info);
172
 
        
173
 
        gBackupInfo->set(ref_id, RETAIN(info));
174
 
        
175
 
        info->isRunning = true;
176
 
 
177
 
        pop_(info);
178
 
        unlock_(gBackupInfo);
179
 
        push_(info);
180
 
        
181
 
        // Create a cleanup object to handle cleanup
182
 
        // after a possible exception.
183
 
        new_(cleanup, StartDumpCleanUp());
184
 
        push_(cleanup);
185
 
        cleanup->setCleanUp(ref_id);
186
 
        
187
 
        MSBackupTable::saveTable(RETAIN(db));
188
 
        
189
 
        cleanup->cancelCleanUp();
190
 
        release_(cleanup);
191
 
        
192
 
        pop_(info);
193
 
        release_(db);
194
 
 
195
 
        return_(info);
196
 
}
197
 
//-------------------------------
198
 
void MSBackupInfo::backupCompleted(MSDatabase *db)
199
 
{
200
 
        completionTime = time(NULL);    
201
 
        isRunning = false;
202
 
        MSBackupTable::saveTable(db);
203
 
}
204
 
 
205
 
//-------------------------------
206
 
void MSBackupInfo::backupTerminated(MSDatabase *db)
207
 
{
208
 
        enter_();
209
 
        push_(db);
210
 
        lock_(gBackupInfo);
211
 
        
212
 
        gBackupInfo->remove(backupRefId);
213
 
        unlock_(gBackupInfo);
214
 
        
215
 
        pop_(db);
216
 
        MSBackupTable::saveTable(db);
217
 
        exit_();
218
 
}
219
 
 
220
 
//==========================================
221
 
MSBackup::MSBackup():
222
 
CSDaemon(NULL),
223
 
bu_info(NULL),
224
 
bu_BackupList(NULL),
225
 
bu_Compactor(NULL),
226
 
bu_BackupRunning(false),
227
 
bu_State(BU_COMPLETED),
228
 
bu_SourceDatabase(NULL),
229
 
bu_Database(NULL),
230
 
bu_dst_dump(NULL),
231
 
bu_src_dump(NULL),
232
 
bu_size(0),
233
 
bu_completed(0),
234
 
bu_ID(0),
235
 
bu_start_time(0),
236
 
bu_TransactionManagerSuspended(false)
237
 
{
238
 
}
239
 
 
240
 
MSBackup *MSBackup::newMSBackup(MSBackupInfo *info)
241
 
{
242
 
        MSBackup *bu;
243
 
        enter_();
244
 
        
245
 
        push_(info);
246
 
        
247
 
        new_(bu, MSBackup());
248
 
        push_(bu);
249
 
        bu->bu_Database = MSDatabase::getBackupDatabase(RETAIN(info->backupLocation), RETAIN(info->db_name), info->db_id, true);
250
 
        pop_(bu);
251
 
        
252
 
        bu->bu_info = info;
253
 
        pop_(info);
254
 
 
255
 
        return_(bu);
256
 
}
257
 
 
258
 
//-------------------------------
259
 
class StartBackupCleanUp : public CSRefObject {
260
 
        bool do_cleanup;
261
 
        MSBackup *backup;
262
 
 
263
 
        public:
264
 
        
265
 
        StartBackupCleanUp(): CSRefObject(),
266
 
                do_cleanup(false){}
267
 
                
268
 
        ~StartBackupCleanUp() 
269
 
        {
270
 
                if (do_cleanup) {
271
 
                        backup->completeBackup();
272
 
                }
273
 
        }
274
 
        
275
 
        void setCleanUp(MSBackup *bup)
276
 
        {
277
 
                backup = bup;
278
 
                do_cleanup = true;
279
 
        }
280
 
        
281
 
        void cancelCleanUp()
282
 
        {
283
 
                do_cleanup = false;
284
 
        }
285
 
        
286
 
};
287
 
 
288
 
void MSBackup::startBackup(MSDatabase *src_db)
289
 
{
290
 
        CSSyncVector    *repo_list;
291
 
        bool                    compacting = false;
292
 
        MSRepository    *repo;
293
 
        StartBackupCleanUp *cleanup;
294
 
        enter_();
295
 
 
296
 
        // Create a cleanup object to handle cleanup
297
 
        // after a possible exception.
298
 
        new_(cleanup, StartBackupCleanUp());
299
 
        push_(cleanup);
300
 
        cleanup->setCleanUp(this);
301
 
 
302
 
        bu_SourceDatabase = src_db;
303
 
        repo_list = bu_SourceDatabase->getRepositoryList();
304
 
        // Suspend the compactor before locking the list.
305
 
        bu_Compactor = bu_SourceDatabase->getCompactorThread();
306
 
        if (bu_Compactor) {
307
 
                bu_Compactor->retain();
308
 
                bu_Compactor->suspend();
309
 
        }
310
 
 
311
 
        // Build the list of repositories to be backed up.
312
 
        lock_(repo_list);
313
 
 
314
 
        new_(bu_BackupList, CSVector(repo_list->size()));
315
 
        for (uint32_t i = 0; i<repo_list->size(); i++) {
316
 
                if ((repo = (MSRepository *) repo_list->get(i))) {
317
 
                        if (!repo->isRemovingFP && !repo->mustBeDeleted) {
318
 
                                bu_BackupList->add(RETAIN(repo));
319
 
                                if (repo->initBackup() == REPO_COMPACTING) 
320
 
                                        compacting = true; 
321
 
                                
322
 
                                if (!repo->myRepoHeadSize) {
323
 
                                        /* The file has not yet been opened, so the
324
 
                                         * garbage count will not be known!
325
 
                                         */
326
 
                                        MSRepoFile *repo_file;
327
 
 
328
 
                                        //repo->retain();
329
 
                                        //unlock_(myRepostoryList);
330
 
                                        //push_(repo);
331
 
                                        repo_file = repo->openRepoFile();
332
 
                                        repo_file->release();
333
 
                                        //release_(repo);
334
 
                                        //lock_(myRepostoryList);
335
 
                                        //goto retry;
336
 
                                }
337
 
                                
338
 
                                bu_size += repo->myRepoFileSize; 
339
 
 
340
 
                        }
341
 
                }
342
 
        }
343
 
        
344
 
        // Copy the table list to the backup database:
345
 
        uint32_t                next_tab = 0;
346
 
        MSTable         *tab;
347
 
        while ((tab = bu_SourceDatabase->getNextTable(&next_tab))) {
348
 
                push_(tab);
349
 
                bu_Database->addTable(tab->myTableID, tab->myTableName->getCString(), 0, false);
350
 
                release_(tab);
351
 
        }
352
 
        unlock_(repo_list);
353
 
        
354
 
        // Copy over any physical PBMS system tables.
355
 
        PBMSSystemTables::transferSystemTables(RETAIN(bu_Database), RETAIN(bu_SourceDatabase));
356
 
 
357
 
        // Load the system tables into the backup database. This will
358
 
        // initialize the database for cloud storage if required.
359
 
        PBMSSystemTables::loadSystemTables(RETAIN(bu_Database));
360
 
        
361
 
        // Set the cloud backup info.
362
 
        bu_Database->myBlobCloud->cl_setBackupInfo(RETAIN(bu_info));
363
 
        
364
 
        
365
 
        // Set the backup number in the pbms_variable tabe. (This is a hidden value.)
366
 
        // This value is used in case a drag and drop restore was done. When a data base is
367
 
        // first loaded this value is checked and if it is not zero then the backup record
368
 
        // will be read and any used to recover any BLOBs.
369
 
        // 
370
 
        char value[20];
371
 
        snprintf(value, 20, "%"PRIu32"", bu_info->getBackupRefId());
372
 
        MSVariableTable::setVariable(RETAIN(bu_Database), BACKUP_NUMBER_VAR, value);
373
 
        
374
 
        // Once the repositories are locked the compactor can be restarted
375
 
        // unless it is in the process of compacting a repository that is
376
 
        // being backed up.
377
 
        if (bu_Compactor && !compacting) {      
378
 
                bu_Compactor->resume();         
379
 
                bu_Compactor->release();                
380
 
                bu_Compactor = NULL;            
381
 
        }
382
 
        
383
 
        // Suspend the transaction writer while the backup is running.
384
 
        MSTransactionManager::suspend(true);
385
 
        bu_TransactionManagerSuspended = true;
386
 
        
387
 
        // Start the backup daemon thread.
388
 
        bu_ID = bu_start_time = time(NULL);
389
 
        start();
390
 
        
391
 
        cleanup->cancelCleanUp();
392
 
        release_(cleanup);
393
 
 
394
 
        exit_();
395
 
}
396
 
 
397
 
void MSBackup::completeBackup()
398
 
{
399
 
        if (bu_TransactionManagerSuspended) {   
400
 
                MSTransactionManager::resume();
401
 
                bu_TransactionManagerSuspended = false;
402
 
        }
403
 
 
404
 
        if (bu_BackupList) {
405
 
                MSRepository *repo;             
406
 
                
407
 
                while (bu_BackupList->size()) {
408
 
                        repo = (MSRepository *) bu_BackupList->take(0);
409
 
                        if (repo) {                             
410
 
                                repo->backupCompleted();
411
 
                                repo->release();                                
412
 
                        }
413
 
                }
414
 
                bu_BackupList->release();
415
 
                bu_BackupList = NULL;
416
 
        }
417
 
                
418
 
        if (bu_Compactor) {
419
 
                bu_Compactor->resume();
420
 
                bu_Compactor->release();
421
 
                bu_Compactor = NULL;
422
 
        }
423
 
        
424
 
        if (bu_Database) {
425
 
                if (bu_State == BU_COMPLETED)
426
 
                        bu_Database->releaseBackupDatabase();
427
 
                else 
428
 
                        MSDatabase::dropDatabase(bu_Database);
429
 
                        
430
 
                bu_Database = NULL;
431
 
        }
432
 
 
433
 
        if (bu_SourceDatabase){
434
 
                if (bu_State == BU_COMPLETED) 
435
 
                        bu_info->backupCompleted(bu_SourceDatabase);
436
 
                else 
437
 
                        bu_info->backupTerminated(bu_SourceDatabase);
438
 
                
439
 
                bu_SourceDatabase = NULL;
440
 
                bu_info->release();
441
 
                bu_info = NULL;
442
 
        }
443
 
        
444
 
        bu_BackupRunning = false;
445
 
}
446
 
 
447
 
bool MSBackup::doWork()
448
 
{
449
 
        enter_();
450
 
        try_(a) {
451
 
                CSMutex                         *my_lock;
452
 
                MSRepository            *src_repo, *dst_repo;
453
 
                MSRepoFile                      *src_file, *dst_file;
454
 
                off64_t                         src_offset, prev_offset;
455
 
                uint16_t                                head_size;
456
 
                uint64_t                                blob_size, blob_data_size;
457
 
                CSStringBuffer          *head;
458
 
                MSRepoPointersRec       ptr;
459
 
                uint32_t                                table_ref_count;
460
 
                uint32_t                                blob_ref_count;
461
 
                int                                     ref_count;
462
 
                size_t                          ref_size;
463
 
                uint32_t                                auth_code;
464
 
                uint32_t                                tab_id;
465
 
                uint64_t                                blob_id;
466
 
                MSOpenTable                     *otab;
467
 
                uint8_t                         status;
468
 
                uint8_t                         blob_storage_type;
469
 
                uint16_t                                tab_index;
470
 
                uint32_t                                mod_time;
471
 
                char                            *transferBuffer;
472
 
                CloudKeyRec                     cloud_key;
473
 
 
474
 
        
475
 
                bu_BackupRunning = true;
476
 
                bu_State = BU_RUNNING; 
477
 
 
478
 
        /*
479
 
                // For testing:
480
 
                {
481
 
                        int blockit = 0;
482
 
                        myWaitTime = 5 * 1000;  // Time in milli-seconds
483
 
                        while (blockit)
484
 
                                return_(true);
485
 
                }
486
 
        */
487
 
        
488
 
                transferBuffer = (char*) cs_malloc(MS_BACKUP_BUFFER_SIZE);
489
 
                push_ptr_(transferBuffer);
490
 
                
491
 
                new_(head, CSStringBuffer(100));
492
 
                push_(head);
493
 
 
494
 
                src_repo = (MSRepository*)bu_BackupList->get(0);
495
 
                while (src_repo && !myMustQuit) {
496
 
                        src_offset = 0;
497
 
                        src_file = src_repo->openRepoFile();
498
 
                        push_(src_file);
499
 
 
500
 
                        dst_repo = bu_Database->lockRepo(src_repo->myRepoFileSize - src_repo->myGarbageCount);
501
 
                        frompool_(dst_repo);
502
 
                        dst_file = dst_repo->openRepoFile();
503
 
                        push_(dst_file);
504
 
                        
505
 
                        src_offset = src_repo->myRepoHeadSize;
506
 
                        prev_offset = 0;
507
 
                        while (src_offset < src_repo->myRepoFileSize) { 
508
 
        retry_read:
509
 
                                        
510
 
                                bu_completed += src_offset - prev_offset;
511
 
                                prev_offset = src_offset;
512
 
                                suspended();
513
 
 
514
 
                                if (myMustQuit)
515
 
                                        break;
516
 
                                
517
 
                                // A lock is required here because references and dereferences to the
518
 
                                // BLOBs can result in the repository record being updated while 
519
 
                                // it is being copied.
520
 
                                my_lock = &src_repo->myRepoLock[src_offset % CS_REPO_REC_LOCK_COUNT];
521
 
                                lock_(my_lock);
522
 
                                head->setLength(src_repo->myRepoBlobHeadSize);
523
 
                                if (src_file->read(head->getBuffer(0), src_offset, src_repo->myRepoBlobHeadSize, 0) < src_repo->myRepoBlobHeadSize) { 
524
 
                                        unlock_(my_lock);
525
 
                                        break;
526
 
                                }
527
 
                                        
528
 
                                ptr.rp_chars = head->getBuffer(0);
529
 
                                ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
530
 
                                ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
531
 
                                head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
532
 
                                blob_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_repo_size_6);
533
 
                                blob_data_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_data_size_6);
534
 
                                auth_code = CS_GET_DISK_4(ptr.rp_head->rb_auth_code_4);
535
 
                                status = CS_GET_DISK_1(ptr.rp_head->rb_status_1);
536
 
                                mod_time = CS_GET_DISK_4(ptr.rp_head->rb_mod_time_4);
537
 
                                
538
 
                                blob_storage_type = CS_GET_DISK_1(ptr.rp_head->rb_storage_type_1);
539
 
                                if (blob_storage_type == MS_CLOUD_STORAGE) {
540
 
                                        MSRepoFile::getBlobKey(ptr.rp_head, &cloud_key);
541
 
                                }
542
 
 
543
 
                                // If the BLOB was modified after the start of the backup
544
 
                                // then set the mod time to the backup time to ensure that
545
 
                                // a backup for update will work correctly.
546
 
                                if (mod_time > bu_start_time)
547
 
                                        CS_SET_DISK_4(ptr.rp_head->rb_mod_time_4, bu_start_time);
548
 
                                        
549
 
                                // If the BLOB was moved during the time of this backup then copy
550
 
                                // it to the backup location as a referenced BLOB.
551
 
                                if ((status == MS_BLOB_MOVED)  && (bu_ID == (uint32_t) CS_GET_DISK_4(ptr.rp_head->rb_backup_id_4))) {
552
 
                                        status = MS_BLOB_REFERENCED;
553
 
                                        CS_SET_DISK_1(ptr.rp_head->rb_status_1, status);
554
 
                                }
555
 
                                
556
 
                                // sanity check
557
 
                                if ((blob_data_size == 0) || ref_count <= 0 || ref_size == 0 ||
558
 
                                        head_size < src_repo->myRepoBlobHeadSize + ref_count * ref_size ||
559
 
                                        !VALID_BLOB_STATUS(status)) {
560
 
                                        /* Can't be true. Assume this is garbage! */
561
 
                                        src_offset++;
562
 
                                        unlock_(my_lock);
563
 
                                        continue;
564
 
                                }
565
 
                                
566
 
                                
567
 
                                if ((status == MS_BLOB_REFERENCED) || (status == MS_BLOB_MOVED)) {
568
 
                                        head->setLength(head_size);
569
 
                                        if (src_file->read(head->getBuffer(0) + src_repo->myRepoBlobHeadSize, src_offset + src_repo->myRepoBlobHeadSize, head_size  - src_repo->myRepoBlobHeadSize, 0) != (head_size- src_repo->myRepoBlobHeadSize)) {
570
 
                                                unlock_(my_lock);
571
 
                                                break;
572
 
                                        }
573
 
 
574
 
                                        table_ref_count = 0;
575
 
                                        blob_ref_count = 0;
576
 
                                        
577
 
                                        // Loop through all the references removing temporary references 
578
 
                                        // and counting table and blob references.
579
 
                                        
580
 
                                        ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
581
 
                                        for (int count = 0; count < ref_count; count++) {
582
 
                                                switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
583
 
                                                        case MS_BLOB_FREE_REF:
584
 
                                                                break;
585
 
                                                        case MS_BLOB_TABLE_REF:
586
 
                                                                // Unlike the compactor, table refs are not checked because
587
 
                                                                // they do not yet exist in the backup database.
588
 
                                                                table_ref_count++;
589
 
                                                                break;
590
 
                                                        case MS_BLOB_DELETE_REF:
591
 
                                                                // These are temporary references from the TempLog file. 
592
 
                                                                // They are not copied to the backup. 
593
 
                                                                CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
594
 
                                                                break;
595
 
                                                        default:
596
 
                                                                // Must be a BLOB reference
597
 
                                                                
598
 
                                                                tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
599
 
                                                                if (tab_index && (tab_index <= ref_count)) {
600
 
                                                                        // Only committed references are backed up.
601
 
                                                                        if (IS_COMMITTED(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8))) {
602
 
                                                                                MSRepoTableRefPtr       tab_ref;
603
 
                                                                                tab_ref = (MSRepoTableRefPtr) (head->getBuffer(0) + src_repo->myRepoBlobHeadSize + (tab_index-1) * ref_size);
604
 
                                                                                if (CS_GET_DISK_2(tab_ref->rr_type_2) == MS_BLOB_TABLE_REF)
605
 
                                                                                        blob_ref_count++;
606
 
                                                                        } else {
607
 
                                                                                CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
608
 
                                                                        }
609
 
                                                                
610
 
                                                                } else {
611
 
                                                                        /* Can't be true. Assume this is garbage! */
612
 
                                                                        src_offset++;
613
 
                                                                        unlock_(my_lock);
614
 
                                                                        goto retry_read;
615
 
                                                                }
616
 
                                                                break;
617
 
                                                }
618
 
                                                ptr.rp_chars += ref_size;
619
 
                                        }
620
 
 
621
 
 
622
 
                                        // If there are still blob references then the record needs to be backed up.
623
 
                                        if (table_ref_count && blob_ref_count) {
624
 
 
625
 
                                                off64_t dst_offset;
626
 
 
627
 
                                                dst_offset = dst_repo->myRepoFileSize;
628
 
                                                
629
 
                                                /* Write the header. */
630
 
                                                dst_file->write(head->getBuffer(0), dst_offset, head_size);
631
 
 
632
 
                                                /* Copy the BLOB over: */
633
 
                                                if (blob_storage_type == MS_CLOUD_STORAGE) { 
634
 
                                                        bu_Database->myBlobCloud->cl_backupBLOB(&cloud_key);
635
 
                                                } else
636
 
                                                        CSFile::transfer(RETAIN(dst_file), dst_offset + head_size, RETAIN(src_file), src_offset + head_size, blob_size, transferBuffer, MS_BACKUP_BUFFER_SIZE);
637
 
                                        
638
 
                                                /* Update the references: */
639
 
                                                ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
640
 
                                                for (int count = 0; count < ref_count; count++) {
641
 
                                                        switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
642
 
                                                                case MS_BLOB_FREE_REF:
643
 
                                                                case MS_BLOB_DELETE_REF:
644
 
                                                                        break;
645
 
                                                                case MS_BLOB_TABLE_REF:
646
 
                                                                        tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
647
 
                                                                        blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
648
 
 
649
 
                                                                        if ((otab = MSTableList::getOpenTableByID(bu_Database->myDatabaseID, tab_id))) {
650
 
                                                                                frompool_(otab);
651
 
                                                                                otab->getDBTable()->setBlobHandle(otab, blob_id, dst_repo->myRepoID, dst_offset, blob_size, head_size, auth_code);
652
 
//CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "What if an error ocurred here!");
653
 
 
654
 
                                                                                backtopool_(otab);
655
 
                                                                        }
656
 
                                                                        break;
657
 
                                                                default:
658
 
                                                                        break;
659
 
                                                        }
660
 
                                                        ptr.rp_chars += ref_size;
661
 
                                                }
662
 
 
663
 
                                                dst_repo->myRepoFileSize += head_size + blob_size;
664
 
                                        }
665
 
                                }
666
 
                                unlock_(my_lock);
667
 
                                src_offset += head_size + blob_size;
668
 
                        }
669
 
                        bu_completed += src_offset - prev_offset;
670
 
                        
671
 
                        // close the destination repository and cleanup.
672
 
                        release_(dst_file);
673
 
                        backtopool_(dst_repo);
674
 
                        release_(src_file);
675
 
                        
676
 
                        // release the source repository and get the next one in the list.
677
 
                        src_repo->backupCompleted();
678
 
                        bu_BackupList->remove(0);
679
 
                        
680
 
                        src_repo = (MSRepository*)bu_BackupList->get(0);
681
 
                }
682
 
                                
683
 
                release_(head);
684
 
                release_(transferBuffer);
685
 
                if (myMustQuit)
686
 
                        bu_State = BU_TERMINATED; 
687
 
                else
688
 
                        bu_State = BU_COMPLETED; 
689
 
                        
690
 
        }       
691
 
        
692
 
        catch_(a) {
693
 
                logException();
694
 
        }
695
 
        
696
 
        cont_(a);       
697
 
        completeBackup();
698
 
        myMustQuit = true;
699
 
        return_(true);
700
 
}
701
 
 
702
 
void *MSBackup::completeWork()
703
 
{
704
 
        if (bu_SourceDatabase || bu_BackupList || bu_Compactor || bu_info) {
705
 
                // We shouldn't be here
706
 
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "MSBackup::completeBackup() not called");
707
 
                if (bu_SourceDatabase) {
708
 
                         bu_SourceDatabase->release();
709
 
                         bu_SourceDatabase = NULL;
710
 
                }
711
 
                        
712
 
                if (bu_BackupList) {
713
 
                         bu_BackupList->release();
714
 
                         bu_BackupList = NULL;
715
 
                }
716
 
 
717
 
                        
718
 
                if (bu_Compactor) {
719
 
                         bu_Compactor->release();
720
 
                         bu_Compactor = NULL;
721
 
                }
722
 
 
723
 
                        
724
 
                if (bu_info) {
725
 
                         bu_info->release();
726
 
                         bu_info = NULL;
727
 
                }
728
 
 
729
 
        }
730
 
        return NULL;
731
 
}