~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/transaction_ms.cc

  • Committer: Mark Atwood
  • Date: 2011-12-20 02:32:53 UTC
  • mfrom: (2469.1.1 drizzle-build)
  • Revision ID: me@mark.atwood.name-20111220023253-bvu0kr14kwsdvz7g
mergeĀ lp:~brianaker/drizzle/deprecate-pbms

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Barry Leslie
20
 
 *
21
 
 * 2009-07-09
22
 
 *
23
 
 * H&G2JCtL
24
 
 *
25
 
 * PBMS transaction daemon.
26
 
 *
27
 
 *
28
 
 */
29
 
 
30
 
#include "cslib/CSConfig.h"
31
 
 
32
 
#include <inttypes.h>
33
 
 
34
 
#include "defs_ms.h"
35
 
 
36
 
#include "cslib/CSGlobal.h"
37
 
#include "cslib/CSStrUtil.h"
38
 
#include "cslib/CSLog.h"
39
 
 
40
 
#include "mysql_ms.h"
41
 
#include "open_table_ms.h"
42
 
#include "trans_log_ms.h"
43
 
#include "transaction_ms.h"
44
 
#include "pbmsdaemon_ms.h"
45
 
 
46
 
/*
47
 
 * The pbms_ functions are utility functions supplied by ha_pbms.cc
48
 
 */
49
 
void    pbms_take_part_in_transaction(void *thread);
50
 
 
51
 
MSTrans *MSTransactionManager::tm_Log;
52
 
MSTransactionThread *MSTransactionManager::tm_Reader;
53
 
 
54
 
 
55
 
typedef  struct {
56
 
        CSDiskValue4    lr_time_4;              // The database ID for the operation.
57
 
        CSDiskValue1    lr_state_1;             // The transaction state. 
58
 
        CSDiskValue1    lr_type_1;              // The transaction type. If the first bit is set then the transaction is an autocommit.
59
 
        CSDiskValue4    lr_db_id_4;             // The database ID for the operation.
60
 
        CSDiskValue4    lr_tab_id_4;    // The table ID for the operation.
61
 
        CSDiskValue8    lr_blob_id_8;   // The blob ID for the operation.
62
 
        CSDiskValue8    lr_blob_ref_id_8;// The blob reference id.
63
 
} MSDiskLostRec, *MSDiskLostPtr;
64
 
 
65
 
/*
66
 
 * ---------------------------------------------------------------
67
 
 * The transaction reader thread 
68
 
 */
69
 
 
70
 
class MSTransactionThread : public CSDaemon {
71
 
public:
72
 
        MSTransactionThread(MSTrans *txn_log);
73
 
        
74
 
        virtual ~MSTransactionThread(){} // Do nothing here because 'self' will no longer be valid, use completeWork().
75
 
 
76
 
 
77
 
        void close();
78
 
 
79
 
        virtual bool doWork();
80
 
 
81
 
        virtual void *completeWork();
82
 
        
83
 
        void flush();
84
 
        
85
 
        bool trt_is_ready;
86
 
private:
87
 
        void reportLostReference(MSTransPtr rec, MS_TxnState state);
88
 
        void dereference(MSTransPtr rec, MS_TxnState state);
89
 
        void commitReference(MSTransPtr rec, MS_TxnState state);
90
 
        
91
 
        MSTrans *trt_log;
92
 
        CSFile  *trt_lostLog;
93
 
 
94
 
};
95
 
 
96
 
MSTransactionThread::MSTransactionThread(MSTrans *txn_log):
97
 
CSDaemon(0, NULL),
98
 
trt_is_ready(false),
99
 
trt_log(txn_log),
100
 
trt_lostLog(NULL)
101
 
{
102
 
        trt_log->txn_SetReader(this);
103
 
}
104
 
 
105
 
void MSTransactionThread::close()
106
 
{
107
 
        if (trt_lostLog)
108
 
                trt_lostLog->close();
109
 
}
110
 
 
111
 
void MSTransactionThread::reportLostReference(MSTransPtr rec, MS_TxnState state)
112
 
{
113
 
        MSDiskLostRec lrec;
114
 
        const char *t_txt, *s_txt;
115
 
        char b1[16], b2[16], msg[100];
116
 
        MSDatabase *db;
117
 
        MSTable *tab;
118
 
        
119
 
        //if (PBMSDaemon::isDaemonState(PBMSDaemon::DaemonStartUp) == true)
120
 
                //return;
121
 
 
122
 
        enter_();
123
 
        // Do not report errors caused by missing databases or tables.
124
 
        // This can happen if the transaction log is reread after a crash
125
 
        // and transactions are found that belonged to dropped databases
126
 
        // or tables.
127
 
        db = MSDatabase::getDatabase(rec->tr_db_id, true);
128
 
        if (!db)
129
 
                goto dont_worry_about_it;
130
 
                
131
 
        push_(db);
132
 
        tab = db->getTable(rec->tr_tab_id, true);
133
 
        release_(db);
134
 
        if (!tab)
135
 
                goto dont_worry_about_it;
136
 
        tab->release();
137
 
        
138
 
        switch (state) {
139
 
                case MS_Committed:
140
 
                        s_txt = "Commit";
141
 
                        break;
142
 
                case MS_RolledBack:
143
 
                        s_txt = "RolledBack";
144
 
                        break;
145
 
                case MS_Recovered:
146
 
                        s_txt = "Recovered";
147
 
                        break;
148
 
                case MS_Running:
149
 
                        s_txt = "Running";
150
 
                        break;
151
 
                default:
152
 
                        snprintf(b1, 16, "(%d)?", state);
153
 
                        s_txt = b1;
154
 
        }
155
 
 
156
 
        switch (TRANS_TYPE(rec->tr_type)) {
157
 
                case MS_DereferenceTxn:
158
 
                        t_txt = "Dereference";
159
 
                        break;
160
 
                case MS_ReferenceTxn:
161
 
                        t_txt = "Reference";
162
 
                        break;
163
 
                default:
164
 
                        snprintf(b2, 16, "(%x)?", rec->tr_type);
165
 
                        t_txt = b2;
166
 
        }
167
 
 
168
 
        snprintf(msg, 100, "Lost PBMS record: %s %s db_id: %"PRIu32" tab_id: %"PRIu32" blob_id: %"PRIu64"", s_txt, t_txt, rec->tr_db_id, rec->tr_tab_id, rec->tr_blob_id);
169
 
        CSL.logLine(self, CSLog::Warning, msg);
170
 
 
171
 
        CS_SET_DISK_4(lrec.lr_time_4, time(NULL));
172
 
        CS_SET_DISK_1(lrec.lr_state_1, state);
173
 
        CS_SET_DISK_1(lrec.lr_type_1, rec->tr_type);
174
 
        CS_SET_DISK_4(lrec.lr_db_id_4, rec->tr_db_id);
175
 
        CS_SET_DISK_4(lrec.lr_tab_id_4, rec->tr_tab_id);
176
 
        CS_SET_DISK_8(lrec.lr_blob_id_8, rec->tr_blob_id);
177
 
        CS_SET_DISK_8(lrec.lr_blob_ref_id_8, rec->tr_blob_ref_id);
178
 
        
179
 
        if (!trt_lostLog) {
180
 
                CSPath *path;
181
 
                char *str = cs_strdup(trt_log->txn_GetTXNLogPath());
182
 
                cs_remove_last_name_of_path(str);
183
 
                
184
 
                path = CSPath::newPath(str, "pbms_lost_txn.dat");
185
 
                cs_free(str);
186
 
                
187
 
                trt_lostLog = CSFile::newFile(path);
188
 
                trt_lostLog->open(CSFile::CREATE);
189
 
        }
190
 
        trt_lostLog->write(&lrec, trt_lostLog->getEOF(), sizeof(MSDiskLostRec));
191
 
        trt_lostLog->sync();
192
 
        
193
 
dont_worry_about_it:
194
 
        exit_();
195
 
        
196
 
}
197
 
 
198
 
void MSTransactionThread::dereference(MSTransPtr rec, MS_TxnState state)
199
 
{
200
 
        enter_();
201
 
        
202
 
        try_(a) {
203
 
                MSOpenTable             *otab;
204
 
                otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
205
 
                frompool_(otab);
206
 
                otab->freeReference(rec->tr_blob_id, rec->tr_blob_ref_id);
207
 
                backtopool_(otab);
208
 
        }
209
 
        
210
 
        catch_(a) {
211
 
                reportLostReference(rec, state);
212
 
        }
213
 
        
214
 
        cont_(a);       
215
 
        exit_();
216
 
}
217
 
 
218
 
void MSTransactionThread::commitReference(MSTransPtr rec, MS_TxnState state)
219
 
{
220
 
        enter_();
221
 
        
222
 
        try_(a) {
223
 
                MSOpenTable             *otab;
224
 
                otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
225
 
                frompool_(otab);
226
 
                otab->commitReference(rec->tr_blob_id, rec->tr_blob_ref_id);
227
 
                backtopool_(otab);
228
 
        }
229
 
        
230
 
        catch_(a) {
231
 
                reportLostReference(rec, state);
232
 
        }
233
 
        
234
 
        cont_(a);       
235
 
        exit_();
236
 
}
237
 
 
238
 
void MSTransactionThread::flush()
239
 
{
240
 
        enter_();
241
 
        
242
 
        // For now I just wait until the transaction queue is empty or
243
 
        // the transaction at the head of the queue has not yet been
244
 
        // committed.
245
 
        //
246
 
        // What needs to be done is for the transaction log to scan 
247
 
        // past the non commited transaction to see if there are any
248
 
        // other committed transaction in the log and apply them if found.
249
 
        
250
 
        wakeup(); // Incase the reader is sleeping.
251
 
        while (trt_log->txn_haveNextTransaction() && !isSuspend() && self->myMustQuit)
252
 
                self->sleep(10);                
253
 
        exit_();
254
 
}
255
 
 
256
 
bool MSTransactionThread::doWork()
257
 
{
258
 
        enter_();
259
 
        
260
 
        try_(a) {
261
 
                MSTransRec rec = {0,0,0,0,0,0,0};
262
 
                MS_TxnState state;
263
 
                while (!myMustQuit) {
264
 
                        // This will sleep while waiting for the next 
265
 
                        // completed transaction.
266
 
                        trt_log->txn_GetNextTransaction(&rec, &state); 
267
 
                        if (myMustQuit)
268
 
                                break;
269
 
                                
270
 
                        if (rec.tr_db_id == 0) // The database was dropped.
271
 
                                continue;
272
 
                                
273
 
                        if (state == MS_Committed){
274
 
                                if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn) 
275
 
                                        dereference(&rec, state);
276
 
                                else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)  
277
 
                                        commitReference(&rec, state);
278
 
 
279
 
                        } else if (state == MS_RolledBack) { 
280
 
                                // There is nothing to do on rollback of a dereference.
281
 
                                
282
 
                                if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)  
283
 
                                        dereference(&rec, state);
284
 
                                        
285
 
                        } else if (state == MS_Recovered) { 
286
 
                                if ((TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn))
287
 
                                        reportLostReference(&rec, state); // Report these even though they may not be lost.
288
 
                                
289
 
                                // Because of the 2 phase commit issue with other engines I cannot
290
 
                                // just roll back the transaction because it may have been committed 
291
 
                                // on the master engine. So to be safe I will always err on the side
292
 
                                // of having unreference BLOBs in the repository rather than risking
293
 
                                // deleting a BLOB that was referenced. To this end I will commit references
294
 
                                // while ignoring (rolling back) dereferences.
295
 
                                if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)  
296
 
                                        commitReference(&rec, state);
297
 
                                
298
 
                        }
299
 
                }
300
 
        }
301
 
        
302
 
        catch_(a) {
303
 
                self->logException();
304
 
                CSL.logLine(NULL, CSLog::Error, "!!!!!!!! THE PBMS TRANSACTION LOG READER DIED! !!!!!!!!!!!");
305
 
        }
306
 
        cont_(a);
307
 
        return_(true);
308
 
}
309
 
 
310
 
void *MSTransactionThread::completeWork()
311
 
{
312
 
        close();
313
 
        
314
 
        if (trt_log)
315
 
                trt_log->release();
316
 
                
317
 
        if (trt_lostLog)
318
 
                trt_lostLog->release();
319
 
        return NULL;
320
 
}
321
 
 
322
 
/*
323
 
 * ---------------------------------------------------------------
324
 
 * The transaction log manager 
325
 
 */
326
 
void MSTransactionManager::startUpReader()
327
 
{
328
 
        char pbms_path[PATH_MAX];
329
 
        enter_();
330
 
        
331
 
        cs_strcpy(PATH_MAX, pbms_path, PBMSDaemon::getPBMSDir()); 
332
 
        cs_add_name_to_path(PATH_MAX, pbms_path, "ms-trans-log.dat");
333
 
        
334
 
        tm_Log = MSTrans::txn_NewMSTrans(pbms_path);
335
 
        new_(tm_Reader, MSTransactionThread(RETAIN(tm_Log)));
336
 
 
337
 
        tm_Reader->start();
338
 
        
339
 
        // Wait for the transaction reader to recover any old transaction:
340
 
        tm_Reader->flush();
341
 
                
342
 
        exit_();
343
 
}
344
 
 
345
 
void MSTransactionManager::startUp()
346
 
{
347
 
        CSPath *path = NULL;
348
 
        enter_();
349
 
        
350
 
        // Do not start the reader if the pbms dir doesn't exist.
351
 
        path = CSPath::newPath(PBMSDaemon::getPBMSDir());
352
 
        push_(path);
353
 
        if (path->exists()) {
354
 
                startUpReader();
355
 
        }
356
 
        release_(path);
357
 
        
358
 
        exit_();
359
 
}
360
 
 
361
 
void MSTransactionManager::shutDown()
362
 
{
363
 
        if (tm_Reader) {
364
 
                tm_Reader->stop();
365
 
                tm_Reader->release();
366
 
                tm_Reader = NULL;
367
 
        }
368
 
        if (tm_Log) {
369
 
                tm_Log->release();
370
 
                tm_Log = NULL;
371
 
        }
372
 
}
373
 
 
374
 
void MSTransactionManager::flush()
375
 
{
376
 
        if (tm_Reader) 
377
 
                tm_Reader->flush();
378
 
}
379
 
 
380
 
void MSTransactionManager::suspend(bool do_flush)
381
 
{
382
 
        enter_();
383
 
        
384
 
        if (do_flush) 
385
 
                flush();
386
 
                
387
 
        if (tm_Reader) {
388
 
                tm_Reader->suspend();
389
 
        }       
390
 
        exit_();
391
 
}
392
 
 
393
 
void MSTransactionManager::resume()
394
 
{
395
 
        enter_();
396
 
        if (tm_Reader) {
397
 
                tm_Reader->resume();
398
 
        }       
399
 
        exit_();
400
 
}
401
 
 
402
 
void MSTransactionManager::commit()
403
 
{
404
 
        enter_();
405
 
        
406
 
        if (!tm_Log)
407
 
                startUpReader();
408
 
                
409
 
        self->myStmtCount = 0;
410
 
        self->myStartStmt = 0;
411
 
 
412
 
        tm_Log->txn_LogTransaction(MS_CommitTxn);
413
 
        
414
 
 
415
 
        exit_();
416
 
}
417
 
 
418
 
void MSTransactionManager::rollback()
419
 
{
420
 
        enter_();
421
 
        
422
 
        if (!tm_Log)
423
 
                startUpReader();
424
 
                
425
 
        self->myStmtCount = 0;
426
 
        self->myStartStmt = 0;
427
 
 
428
 
        tm_Log->txn_LogTransaction(MS_RollBackTxn);
429
 
 
430
 
        exit_();
431
 
}
432
 
 
433
 
class MSTransactionCheckPoint: public CSString
434
 
{
435
 
        public:
436
 
        MSTransactionCheckPoint(const char *name, uint32_t stmtCount ):CSString(name)
437
 
        {
438
 
                position = stmtCount;
439
 
        }
440
 
        
441
 
        uint32_t position;
442
 
};
443
 
 
444
 
#ifdef DRIZZLED
445
 
void MSTransactionManager::setSavepoint(const char *savePoint)
446
 
{
447
 
        MSTransactionCheckPoint *checkPoint;
448
 
        enter_();
449
 
        
450
 
        new_(checkPoint, MSTransactionCheckPoint(savePoint, self->myStmtCount));
451
 
        
452
 
        push_(checkPoint);
453
 
        self->mySavePoints.add(checkPoint);
454
 
        pop_(checkPoint);
455
 
        
456
 
        exit_();
457
 
}
458
 
 
459
 
void MSTransactionManager::releaseSavepoint(const char *savePoint)
460
 
{
461
 
        MSTransactionCheckPoint *checkPoint;
462
 
        CSString *name;
463
 
        enter_();
464
 
        
465
 
        name = CSString::newString(savePoint);
466
 
        push_(name);
467
 
 
468
 
        checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
469
 
        release_(name);
470
 
        
471
 
        if (checkPoint)                 
472
 
                self->mySavePoints.remove(checkPoint);
473
 
                
474
 
        exit_();
475
 
}
476
 
 
477
 
void MSTransactionManager::rollbackTo(const char *savePoint)
478
 
{
479
 
        MSTransactionCheckPoint *checkPoint;
480
 
        CSString *name;
481
 
        enter_();
482
 
        
483
 
        name = CSString::newString(savePoint);
484
 
        push_(name);
485
 
 
486
 
        checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
487
 
        release_(name);
488
 
        
489
 
        if (checkPoint) {
490
 
                uint32_t position = checkPoint->position;
491
 
                
492
 
                self->mySavePoints.remove(checkPoint);
493
 
                rollbackToPosition(position);
494
 
        }
495
 
                
496
 
        exit_();
497
 
}
498
 
#endif
499
 
 
500
 
void MSTransactionManager::rollbackToPosition(uint32_t position)
501
 
{
502
 
        enter_();
503
 
 
504
 
        ASSERT(self->myStmtCount > position);
505
 
        
506
 
        if (!tm_Log)
507
 
                startUpReader();
508
 
        tm_Log->txn_LogPartialRollBack(position);
509
 
        
510
 
        exit_();
511
 
}
512
 
 
513
 
void MSTransactionManager::dropDatabase(uint32_t db_id)
514
 
{
515
 
        enter_();
516
 
 
517
 
        if (!tm_Log)
518
 
                startUpReader();
519
 
        
520
 
        tm_Log->txn_dropDatabase(db_id);
521
 
 
522
 
        exit_();
523
 
}
524
 
 
525
 
void MSTransactionManager::logTransaction(bool ref, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
526
 
{
527
 
        enter_();
528
 
        
529
 
        if (!tm_Log)
530
 
                startUpReader();
531
 
 
532
 
        if (!self->myTID) {
533
 
                bool autocommit = false;
534
 
                autocommit = ms_is_autocommit();
535
 
#ifndef DRIZZLED
536
 
                if (!autocommit)
537
 
                        pbms_take_part_in_transaction(ms_my_get_thread());
538
 
#endif
539
 
                        
540
 
                self->myIsAutoCommit = autocommit;
541
 
        }
542
 
        
543
 
        // PBMS always explicitly commits
544
 
        tm_Log->txn_LogTransaction((ref)?MS_ReferenceTxn:MS_DereferenceTxn, false /*autocommit*/, db_id, tab_id, blob_id, blob_ref_id);
545
 
 
546
 
        self->myStmtCount++;
547
 
        
548
 
        exit_();
549
 
}
550
 
 
551