~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/backup_ms.cc

  • Committer: Patrick Crews
  • Date: 2010-09-14 20:21:03 UTC
  • mto: (1771.1.1 pcrews)
  • mto: This revision was merged to the branch mainline in revision 1772.
  • Revision ID: gleebix@gmail.com-20100914202103-1db2n0bshzafep19
Moved transaction_log tests into updated non-publisher-based tree

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
 
1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
2
2
 *
3
3
 * PrimeBase Media Stream for MySQL
4
4
 *
14
14
 *
15
15
 * You should have received a copy of the GNU General Public License
16
16
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
18
 *
19
19
 * Barry Leslie
20
20
 *
127
127
}
128
128
 
129
129
//-------------------------------
130
 
class StartDumpCleanUp : public CSRefObject {
131
 
        bool do_cleanup;
132
 
        uint32_t ref_id;
133
 
 
134
 
        public:
135
 
        
136
 
        StartDumpCleanUp(): CSRefObject(),
137
 
                do_cleanup(false){}
138
 
                
139
 
        ~StartDumpCleanUp() 
140
 
        {
141
 
                if (do_cleanup) {
142
 
                        MSBackupInfo::gBackupInfo->remove(ref_id);
143
 
                }
144
 
        }
145
 
        
146
 
        void setCleanUp(uint32_t id)
147
 
        {
148
 
                ref_id = id;
149
 
                do_cleanup = true;
150
 
        }
151
 
        
152
 
        void cancelCleanUp()
153
 
        {
154
 
                do_cleanup = false;
155
 
        }
156
 
        
157
 
};
158
 
 
159
130
MSBackupInfo *MSBackupInfo::startDump(MSDatabase *db, uint32_t cloud_ref, uint32_t backup_no)
160
131
{
161
132
        MSBackupInfo *info;
162
133
        uint32_t ref_id;
163
 
        StartDumpCleanUp *cleanup;
164
134
        
165
135
        enter_();
166
136
        push_(db);
176
146
 
177
147
        pop_(info);
178
148
        unlock_(gBackupInfo);
179
 
        push_(info);
180
 
        
181
 
        // Create a cleanup object to handle cleanup
182
 
        // after a possible exception.
183
 
        new_(cleanup, StartDumpCleanUp());
184
 
        push_(cleanup);
185
 
        cleanup->setCleanUp(ref_id);
186
 
        
187
 
        MSBackupTable::saveTable(RETAIN(db));
188
 
        
189
 
        cleanup->cancelCleanUp();
190
 
        release_(cleanup);
191
 
        
192
 
        pop_(info);
193
 
        release_(db);
194
 
 
 
149
        pop_(db);
 
150
        
 
151
        try_(a) {
 
152
                MSBackupTable::saveTable(db);
 
153
        }
 
154
        catch_(a);
 
155
        gBackupInfo->remove(ref_id);
 
156
        info->release();
 
157
        throw_();
 
158
        
 
159
        cont_(a);
195
160
        return_(info);
196
161
}
197
162
//-------------------------------
255
220
        return_(bu);
256
221
}
257
222
 
258
 
//-------------------------------
259
 
class StartBackupCleanUp : public CSRefObject {
260
 
        bool do_cleanup;
261
 
        MSBackup *backup;
262
 
 
263
 
        public:
264
 
        
265
 
        StartBackupCleanUp(): CSRefObject(),
266
 
                do_cleanup(false){}
267
 
                
268
 
        ~StartBackupCleanUp() 
269
 
        {
270
 
                if (do_cleanup) {
271
 
                        backup->completeBackup();
272
 
                }
273
 
        }
274
 
        
275
 
        void setCleanUp(MSBackup *bup)
276
 
        {
277
 
                backup = bup;
278
 
                do_cleanup = true;
279
 
        }
280
 
        
281
 
        void cancelCleanUp()
282
 
        {
283
 
                do_cleanup = false;
284
 
        }
285
 
        
286
 
};
287
 
 
288
223
void MSBackup::startBackup(MSDatabase *src_db)
289
224
{
290
225
        CSSyncVector    *repo_list;
291
226
        bool                    compacting = false;
292
227
        MSRepository    *repo;
293
 
        StartBackupCleanUp *cleanup;
294
228
        enter_();
295
229
 
296
 
        // Create a cleanup object to handle cleanup
297
 
        // after a possible exception.
298
 
        new_(cleanup, StartBackupCleanUp());
299
 
        push_(cleanup);
300
 
        cleanup->setCleanUp(this);
301
 
 
302
 
        bu_SourceDatabase = src_db;
303
 
        repo_list = bu_SourceDatabase->getRepositoryList();
304
 
        // Suspend the compactor before locking the list.
305
 
        bu_Compactor = bu_SourceDatabase->getCompactorThread();
306
 
        if (bu_Compactor) {
307
 
                bu_Compactor->retain();
308
 
                bu_Compactor->suspend();
309
 
        }
310
 
 
311
 
        // Build the list of repositories to be backed up.
312
 
        lock_(repo_list);
313
 
 
314
 
        new_(bu_BackupList, CSVector(repo_list->size()));
315
 
        for (uint32_t i = 0; i<repo_list->size(); i++) {
316
 
                if ((repo = (MSRepository *) repo_list->get(i))) {
317
 
                        if (!repo->isRemovingFP && !repo->mustBeDeleted) {
318
 
                                bu_BackupList->add(RETAIN(repo));
319
 
                                if (repo->initBackup() == REPO_COMPACTING) 
320
 
                                        compacting = true; 
321
 
                                
322
 
                                if (!repo->myRepoHeadSize) {
323
 
                                        /* The file has not yet been opened, so the
324
 
                                         * garbage count will not be known!
325
 
                                         */
326
 
                                        MSRepoFile *repo_file;
327
 
 
328
 
                                        //repo->retain();
329
 
                                        //unlock_(myRepostoryList);
330
 
                                        //push_(repo);
331
 
                                        repo_file = repo->openRepoFile();
332
 
                                        repo_file->release();
333
 
                                        //release_(repo);
334
 
                                        //lock_(myRepostoryList);
335
 
                                        //goto retry;
 
230
        try_(a) {
 
231
                bu_SourceDatabase = src_db;
 
232
                repo_list = bu_SourceDatabase->getRepositoryList();
 
233
                // Suspend the compactor before locking the list.
 
234
                bu_Compactor = bu_SourceDatabase->getCompactorThread();
 
235
                if (bu_Compactor) {
 
236
                        bu_Compactor->retain();
 
237
                        bu_Compactor->suspend();
 
238
                }
 
239
 
 
240
                // Build the list of repositories to be backed up.
 
241
                lock_(repo_list);
 
242
 
 
243
                new_(bu_BackupList, CSVector(repo_list->size()));
 
244
                for (uint32_t i = 0; i<repo_list->size(); i++) {
 
245
                        if ((repo = (MSRepository *) repo_list->get(i))) {
 
246
                                if (!repo->isRemovingFP && !repo->mustBeDeleted) {
 
247
                                        bu_BackupList->add(RETAIN(repo));
 
248
                                        if (repo->initBackup() == REPO_COMPACTING) 
 
249
                                                compacting = true; 
 
250
                                        
 
251
                                        if (!repo->myRepoHeadSize) {
 
252
                                                /* The file has not yet been opened, so the
 
253
                                                 * garbage count will not be known!
 
254
                                                 */
 
255
                                                MSRepoFile *repo_file;
 
256
 
 
257
                                                //repo->retain();
 
258
                                                //unlock_(myRepostoryList);
 
259
                                                //push_(repo);
 
260
                                                repo_file = repo->openRepoFile();
 
261
                                                repo_file->release();
 
262
                                                //release_(repo);
 
263
                                                //lock_(myRepostoryList);
 
264
                                                //goto retry;
 
265
                                        }
 
266
                                        
 
267
                                        bu_size += repo->myRepoFileSize; 
 
268
 
336
269
                                }
337
 
                                
338
 
                                bu_size += repo->myRepoFileSize; 
339
 
 
340
270
                        }
341
271
                }
342
 
        }
343
 
        
344
 
        // Copy the table list to the backup database:
345
 
        uint32_t                next_tab = 0;
346
 
        MSTable         *tab;
347
 
        while ((tab = bu_SourceDatabase->getNextTable(&next_tab))) {
348
 
                push_(tab);
349
 
                bu_Database->addTable(tab->myTableID, tab->myTableName->getCString(), 0, false);
350
 
                release_(tab);
351
 
        }
352
 
        unlock_(repo_list);
353
 
        
354
 
        // Copy over any physical PBMS system tables.
355
 
        PBMSSystemTables::transferSystemTables(RETAIN(bu_Database), RETAIN(bu_SourceDatabase));
356
 
 
357
 
        // Load the system tables into the backup database. This will
358
 
        // initialize the database for cloud storage if required.
359
 
        PBMSSystemTables::loadSystemTables(RETAIN(bu_Database));
360
 
        
361
 
        // Set the cloud backup info.
362
 
        bu_Database->myBlobCloud->cl_setBackupInfo(RETAIN(bu_info));
363
 
        
364
 
        
365
 
        // Set the backup number in the pbms_variable tabe. (This is a hidden value.)
366
 
        // This value is used in case a drag and drop restore was done. When a data base is
367
 
        // first loaded this value is checked and if it is not zero then the backup record
368
 
        // will be read and any used to recover any BLOBs.
369
 
        // 
370
 
        char value[20];
371
 
        snprintf(value, 20, "%"PRIu32"", bu_info->getBackupRefId());
372
 
        MSVariableTable::setVariable(RETAIN(bu_Database), BACKUP_NUMBER_VAR, value);
373
 
        
374
 
        // Once the repositories are locked the compactor can be restarted
375
 
        // unless it is in the process of compacting a repository that is
376
 
        // being backed up.
377
 
        if (bu_Compactor && !compacting) {      
378
 
                bu_Compactor->resume();         
379
 
                bu_Compactor->release();                
380
 
                bu_Compactor = NULL;            
381
 
        }
382
 
        
383
 
        // Suspend the transaction writer while the backup is running.
384
 
        MSTransactionManager::suspend(true);
385
 
        bu_TransactionManagerSuspended = true;
386
 
        
387
 
        // Start the backup daemon thread.
388
 
        bu_ID = bu_start_time = time(NULL);
389
 
        start();
390
 
        
391
 
        cleanup->cancelCleanUp();
392
 
        release_(cleanup);
393
 
 
 
272
                
 
273
                // Copy the table list to the backup database:
 
274
                uint32_t                next_tab = 0;
 
275
                MSTable         *tab;
 
276
                while ((tab = bu_SourceDatabase->getNextTable(&next_tab))) {
 
277
                        push_(tab);
 
278
                        bu_Database->addTable(tab->myTableID, tab->myTableName->getCString(), 0, false);
 
279
                        release_(tab);
 
280
                }
 
281
                unlock_(repo_list);
 
282
                
 
283
                // Copy over any physical PBMS system tables.
 
284
                PBMSSystemTables::transferSystemTables(RETAIN(bu_Database), RETAIN(bu_SourceDatabase));
 
285
 
 
286
                // Load the system tables into the backup database. This will
 
287
                // initialize the database for cloud storage if required.
 
288
                PBMSSystemTables::loadSystemTables(RETAIN(bu_Database));
 
289
                
 
290
                // Set the cloud backup info.
 
291
                bu_Database->myBlobCloud->cl_setBackupInfo(RETAIN(bu_info));
 
292
                
 
293
                
 
294
                // Set the backup number in the pbms_variable tabe. (This is a hidden value.)
 
295
                // This value is used in case a drag and drop restore was done. When a data base is
 
296
                // first loaded this value is checked and if it is not zero then the backup record
 
297
                // will be read and any used to recover any BLOBs.
 
298
                // 
 
299
                char value[20];
 
300
                snprintf(value, 20, "%"PRIu32"", bu_info->getBackupRefId());
 
301
                MSVariableTable::setVariable(RETAIN(bu_Database), BACKUP_NUMBER_VAR, value);
 
302
                
 
303
                // Once the repositories are locked the compactor can be restarted
 
304
                // unless it is in the process of compacting a repository that is
 
305
                // being backed up.
 
306
                if (bu_Compactor && !compacting) {      
 
307
                        bu_Compactor->resume();         
 
308
                        bu_Compactor->release();                
 
309
                        bu_Compactor = NULL;            
 
310
                }
 
311
                
 
312
                // Suspend the transaction writer while the backup is running.
 
313
                MSTransactionManager::suspend(true);
 
314
                bu_TransactionManagerSuspended = true;
 
315
                
 
316
                // Start the backup daemon thread.
 
317
                bu_ID = bu_start_time = time(NULL);
 
318
                start();
 
319
        }
 
320
        catch_(a) {
 
321
                completeBackup();
 
322
                throw_();
 
323
        }
 
324
        cont_(a);
 
325
        
394
326
        exit_();
 
327
 
395
328
}
396
329
 
397
330
void MSBackup::completeBackup()
446
379
 
447
380
bool MSBackup::doWork()
448
381
{
 
382
        CSMutex                         *my_lock;
 
383
        MSRepository            *src_repo, *dst_repo;
 
384
        MSRepoFile                      *src_file, *dst_file;
 
385
        off64_t                         src_offset, prev_offset;
 
386
        uint16_t                                head_size;
 
387
        uint64_t                                blob_size, blob_data_size;
 
388
        CSStringBuffer          *head;
 
389
        MSRepoPointersRec       ptr;
 
390
        uint32_t                                table_ref_count;
 
391
        uint32_t                                blob_ref_count;
 
392
        int                                     ref_count;
 
393
        size_t                          ref_size;
 
394
        uint32_t                                auth_code;
 
395
        uint32_t                                tab_id;
 
396
        uint64_t                                blob_id;
 
397
        MSOpenTable                     *otab;
 
398
        uint32_t                                src_repo_id;
 
399
        uint8_t                         status;
 
400
        uint8_t                         blob_storage_type;
 
401
        uint16_t                                tab_index;
 
402
        uint32_t                                mod_time;
 
403
        char                            transferBuffer[MS_BACKUP_BUFFER_SIZE];
 
404
        CloudKeyRec                     cloud_key;
 
405
 
 
406
        
449
407
        enter_();
 
408
        bu_BackupRunning = true;
 
409
        bu_State = BU_RUNNING; 
 
410
 
 
411
/*
 
412
        // For testing:
 
413
        {
 
414
                int blockit = 0;
 
415
                myWaitTime = 5 * 1000;  // Time in milli-seconds
 
416
                while (blockit)
 
417
                        return_(true);
 
418
        }
 
419
*/
 
420
        
450
421
        try_(a) {
451
 
                CSMutex                         *my_lock;
452
 
                MSRepository            *src_repo, *dst_repo;
453
 
                MSRepoFile                      *src_file, *dst_file;
454
 
                off64_t                         src_offset, prev_offset;
455
 
                uint16_t                                head_size;
456
 
                uint64_t                                blob_size, blob_data_size;
457
 
                CSStringBuffer          *head;
458
 
                MSRepoPointersRec       ptr;
459
 
                uint32_t                                table_ref_count;
460
 
                uint32_t                                blob_ref_count;
461
 
                int                                     ref_count;
462
 
                size_t                          ref_size;
463
 
                uint32_t                                auth_code;
464
 
                uint32_t                                tab_id;
465
 
                uint64_t                                blob_id;
466
 
                MSOpenTable                     *otab;
467
 
                uint32_t                                src_repo_id;
468
 
                uint8_t                         status;
469
 
                uint8_t                         blob_storage_type;
470
 
                uint16_t                                tab_index;
471
 
                uint32_t                                mod_time;
472
 
                char                            *transferBuffer;
473
 
                CloudKeyRec                     cloud_key;
474
 
 
475
 
                transferBuffer= (char*)malloc(MS_BACKUP_BUFFER_SIZE);
476
 
 
477
 
                bu_BackupRunning = true;
478
 
                bu_State = BU_RUNNING; 
479
 
 
480
 
        /*
481
 
                // For testing:
482
 
                {
483
 
                        int blockit = 0;
484
 
                        myWaitTime = 5 * 1000;  // Time in milli-seconds
485
 
                        while (blockit)
486
 
                                return_(true);
487
 
                }
488
 
        */
489
 
        
490
422
                new_(head, CSStringBuffer(100));
491
423
                push_(head);
492
424
 
633
565
                                                if (blob_storage_type == MS_CLOUD_STORAGE) { 
634
566
                                                        bu_Database->myBlobCloud->cl_backupBLOB(&cloud_key);
635
567
                                                } else
636
 
                                                        CSFile::transfer(RETAIN(dst_file), dst_offset + head_size, RETAIN(src_file), src_offset + head_size, blob_size, transferBuffer, MS_BACKUP_BUFFER_SIZE);
 
568
                                                        CSFile::transfer(dst_file, dst_offset + head_size, src_file, src_offset + head_size, blob_size, transferBuffer, MS_BACKUP_BUFFER_SIZE);
637
569
                                        
638
570
                                                /* Update the references: */
639
571
                                                ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
685
617
                        bu_State = BU_TERMINATED; 
686
618
                else
687
619
                        bu_State = BU_COMPLETED; 
688
 
 
689
 
                free(transferBuffer);
 
620
                        
690
621
        }       
691
622
        
692
623
        catch_(a) {