~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/transaction_ms.cc

Merge Stewart's dead code removal

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Barry Leslie
20
 
 *
21
 
 * 2009-07-09
22
 
 *
23
 
 * H&G2JCtL
24
 
 *
25
 
 * PBMS transaction daemon.
26
 
 *
27
 
 *
28
 
 */
29
 
 
30
 
#include "cslib/CSConfig.h"
31
 
 
32
 
#include <inttypes.h>
33
 
 
34
 
#include "defs_ms.h"
35
 
 
36
 
#include "cslib/CSGlobal.h"
37
 
#include "cslib/CSStrUtil.h"
38
 
#include "cslib/CSLog.h"
39
 
 
40
 
#include "mysql_ms.h"
41
 
#include "open_table_ms.h"
42
 
#include "trans_log_ms.h"
43
 
#include "transaction_ms.h"
44
 
#include "pbmsdaemon_ms.h"
45
 
 
46
 
/*
47
 
 * The pbms_ functions are utility functions supplied by ha_pbms.cc
48
 
 */
49
 
void    pbms_take_part_in_transaction(void *thread);
50
 
 
51
 
MSTrans *MSTransactionManager::tm_Log;
52
 
MSTransactionThread *MSTransactionManager::tm_Reader;
53
 
 
54
 
 
55
 
typedef  struct {
56
 
        CSDiskValue4    lr_time_4;              // The database ID for the operation.
57
 
        CSDiskValue1    lr_state_1;             // The transaction state. 
58
 
        CSDiskValue1    lr_type_1;              // The transaction type. If the first bit is set then the transaction is an autocommit.
59
 
        CSDiskValue4    lr_db_id_4;             // The database ID for the operation.
60
 
        CSDiskValue4    lr_tab_id_4;    // The table ID for the operation.
61
 
        CSDiskValue8    lr_blob_id_8;   // The blob ID for the operation.
62
 
        CSDiskValue8    lr_blob_ref_id_8;// The blob reference id.
63
 
} MSDiskLostRec, *MSDiskLostPtr;
64
 
 
65
 
/*
66
 
 * ---------------------------------------------------------------
67
 
 * The transaction reader thread 
68
 
 */
69
 
 
70
 
class MSTransactionThread : public CSDaemon {
71
 
public:
72
 
        MSTransactionThread(MSTrans *txn_log);
73
 
        
74
 
        virtual ~MSTransactionThread(){} // Do nothing here because 'self' will no longer be valid, use completeWork().
75
 
 
76
 
 
77
 
        void close();
78
 
 
79
 
        virtual bool doWork();
80
 
 
81
 
        virtual void *completeWork();
82
 
        
83
 
        void flush();
84
 
        
85
 
        bool trt_is_ready;
86
 
private:
87
 
        void reportLostReference(MSTransPtr rec, MS_TxnState state);
88
 
        void dereference(MSTransPtr rec, MS_TxnState state);
89
 
        void commitReference(MSTransPtr rec, MS_TxnState state);
90
 
        
91
 
        MSTrans *trt_log;
92
 
        CSFile  *trt_lostLog;
93
 
 
94
 
};
95
 
 
96
 
MSTransactionThread::MSTransactionThread(MSTrans *txn_log):
97
 
CSDaemon(0, NULL),
98
 
trt_is_ready(false),
99
 
trt_log(txn_log),
100
 
trt_lostLog(NULL)
101
 
{
102
 
        trt_log->txn_SetReader(this);
103
 
}
104
 
 
105
 
void MSTransactionThread::close()
106
 
{
107
 
        if (trt_lostLog)
108
 
                trt_lostLog->close();
109
 
}
110
 
 
111
 
void MSTransactionThread::reportLostReference(MSTransPtr rec, MS_TxnState state)
112
 
{
113
 
        MSDiskLostRec lrec;
114
 
        const char *t_txt, *s_txt;
115
 
        char b1[16], b2[16], msg[100];
116
 
        
117
 
        //if (PBMSDaemon::isDaemonState(PBMSDaemon::DaemonStartUp) == true)
118
 
                //return;
119
 
 
120
 
        enter_();
121
 
        
122
 
        switch (state) {
123
 
                case MS_Committed:
124
 
                        s_txt = "Commit";
125
 
                        break;
126
 
                case MS_RolledBack:
127
 
                        s_txt = "RolledBack";
128
 
                        break;
129
 
                case MS_Recovered:
130
 
                        s_txt = "Recovered";
131
 
                        break;
132
 
                case MS_Running:
133
 
                        s_txt = "Running";
134
 
                        break;
135
 
                default:
136
 
                        snprintf(b1, 16, "(%d)?", state);
137
 
                        s_txt = b1;
138
 
        }
139
 
 
140
 
        switch (TRANS_TYPE(rec->tr_type)) {
141
 
                case MS_DereferenceTxn:
142
 
                        t_txt = "Dereference";
143
 
                        break;
144
 
                case MS_ReferenceTxn:
145
 
                        t_txt = "Reference";
146
 
                        break;
147
 
                default:
148
 
                        snprintf(b2, 16, "(%x)?", rec->tr_type);
149
 
                        t_txt = b2;
150
 
        }
151
 
 
152
 
        snprintf(msg, 100, "Lost PBMS record: %s %s db_id: %"PRIu32" tab_id: %"PRIu32" blob_id: %"PRIu64"", s_txt, t_txt, rec->tr_db_id, rec->tr_tab_id, rec->tr_blob_id);
153
 
        CSL.logLine(self, CSLog::Warning, msg);
154
 
 
155
 
        CS_SET_DISK_4(lrec.lr_time_4, time(NULL));
156
 
        CS_SET_DISK_1(lrec.lr_state_1, state);
157
 
        CS_SET_DISK_1(lrec.lr_type_1, rec->tr_type);
158
 
        CS_SET_DISK_4(lrec.lr_db_id_4, rec->tr_db_id);
159
 
        CS_SET_DISK_4(lrec.lr_tab_id_4, rec->tr_tab_id);
160
 
        CS_SET_DISK_8(lrec.lr_blob_id_8, rec->tr_blob_id);
161
 
        CS_SET_DISK_8(lrec.lr_blob_ref_id_8, rec->tr_blob_ref_id);
162
 
        
163
 
        if (!trt_lostLog) {
164
 
                CSPath *path;
165
 
                char *str = cs_strdup(trt_log->txn_GetTXNLogPath());
166
 
                cs_remove_last_name_of_path(str);
167
 
                
168
 
                path = CSPath::newPath(str, "pbms_lost_txn.dat");
169
 
                cs_free(str);
170
 
                
171
 
                trt_lostLog = CSFile::newFile(path);
172
 
                trt_lostLog->open(CSFile::CREATE);
173
 
        }
174
 
        trt_lostLog->write(&lrec, trt_lostLog->getEOF(), sizeof(MSDiskLostRec));
175
 
        trt_lostLog->sync();
176
 
        exit_();
177
 
        
178
 
}
179
 
 
180
 
void MSTransactionThread::dereference(MSTransPtr rec, MS_TxnState state)
181
 
{
182
 
        enter_();
183
 
        
184
 
        try_(a) {
185
 
                MSOpenTable             *otab;
186
 
                otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
187
 
                frompool_(otab);
188
 
                otab->freeReference(rec->tr_blob_id, rec->tr_blob_ref_id);
189
 
                backtopool_(otab);
190
 
        }
191
 
        
192
 
        catch_(a) {
193
 
                reportLostReference(rec, state);
194
 
        }
195
 
        
196
 
        cont_(a);       
197
 
        exit_();
198
 
}
199
 
 
200
 
void MSTransactionThread::commitReference(MSTransPtr rec, MS_TxnState state)
201
 
{
202
 
        enter_();
203
 
        
204
 
        try_(a) {
205
 
                MSOpenTable             *otab;
206
 
                otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
207
 
                frompool_(otab);
208
 
                otab->commitReference(rec->tr_blob_id, rec->tr_blob_ref_id);
209
 
                backtopool_(otab);
210
 
        }
211
 
        
212
 
        catch_(a) {
213
 
                reportLostReference(rec, state);
214
 
        }
215
 
        
216
 
        cont_(a);       
217
 
        exit_();
218
 
}
219
 
 
220
 
void MSTransactionThread::flush()
221
 
{
222
 
        enter_();
223
 
        
224
 
        // For now I just wait until the transaction queue is empty or
225
 
        // the transaction at the head of the queue has not yet been
226
 
        // committed.
227
 
        //
228
 
        // What needs to be done is for the transaction log to scan 
229
 
        // past the non commited transaction to see if there are any
230
 
        // other committed transaction in the log and apply them if found.
231
 
        
232
 
        wakeup(); // Incase the reader is sleeping.
233
 
        while (trt_log->txn_haveNextTransaction() && !isSuspend() && self->myMustQuit)
234
 
                self->sleep(10);                
235
 
        exit_();
236
 
}
237
 
 
238
 
bool MSTransactionThread::doWork()
239
 
{
240
 
        enter_();
241
 
        
242
 
        try_(a) {
243
 
                MSTransRec rec = {0,0,0,0,0,0,0};
244
 
                MS_TxnState state;
245
 
                while (!myMustQuit) {
246
 
                        // This will sleep while waiting for the next 
247
 
                        // completed transaction.
248
 
                        trt_log->txn_GetNextTransaction(&rec, &state); 
249
 
                        if (myMustQuit)
250
 
                                break;
251
 
                                
252
 
                        if (rec.tr_db_id == 0) // The database was dropped.
253
 
                                continue;
254
 
                                
255
 
                        if (state == MS_Committed){
256
 
                                if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn) 
257
 
                                        dereference(&rec, state);
258
 
                                else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)  
259
 
                                        commitReference(&rec, state);
260
 
 
261
 
                        } else if (state == MS_RolledBack) { 
262
 
                                // There is nothing to do on rollback of a dereference.
263
 
                                
264
 
                                if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)  
265
 
                                        dereference(&rec, state);
266
 
                                        
267
 
                        } else if (state == MS_Recovered) { 
268
 
                                if ((TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn))
269
 
                                        reportLostReference(&rec, state); // Report these even though they may not be lost.
270
 
                                
271
 
                                // Because of the 2 phase commit issue with other engines I cannot
272
 
                                // just roll back the transaction because it may have been committed 
273
 
                                // on the master engine. So to be safe I will always err on the side
274
 
                                // of having unreference BLOBs in the repository rather than risking
275
 
                                // deleting a BLOB that was referenced. To this end I will commit references
276
 
                                // while ignoring (rolling back) dereferences.
277
 
                                if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)  
278
 
                                        commitReference(&rec, state);
279
 
                                
280
 
                        }
281
 
                }
282
 
        }
283
 
        
284
 
        catch_(a) {
285
 
                self->logException();
286
 
                CSL.logLine(NULL, CSLog::Error, "!!!!!!!! THE PBMS TRANSACTION LOG READER DIED! !!!!!!!!!!!");
287
 
        }
288
 
        cont_(a);
289
 
        return_(true);
290
 
}
291
 
 
292
 
void *MSTransactionThread::completeWork()
293
 
{
294
 
        close();
295
 
        
296
 
        if (trt_log)
297
 
                trt_log->release();
298
 
                
299
 
        if (trt_lostLog)
300
 
                trt_lostLog->release();
301
 
        return NULL;
302
 
}
303
 
 
304
 
/*
305
 
 * ---------------------------------------------------------------
306
 
 * The transaction log manager 
307
 
 */
308
 
void MSTransactionManager::startUpReader()
309
 
{
310
 
        char pbms_path[PATH_MAX];
311
 
        enter_();
312
 
        
313
 
        cs_strcpy(PATH_MAX, pbms_path, PBMSDaemon::getPBMSDir()); 
314
 
        cs_add_name_to_path(PATH_MAX, pbms_path, "ms-trans-log.dat");
315
 
        
316
 
        tm_Log = MSTrans::txn_NewMSTrans(pbms_path);
317
 
        new_(tm_Reader, MSTransactionThread(RETAIN(tm_Log)));
318
 
 
319
 
        tm_Reader->start();
320
 
        
321
 
        // Wait for the transaction reader to recover any old transaction:
322
 
        tm_Reader->flush();
323
 
                
324
 
        exit_();
325
 
}
326
 
 
327
 
void MSTransactionManager::startUp()
328
 
{
329
 
        CSPath *path = NULL;
330
 
        enter_();
331
 
        
332
 
        // Do not start the reader if the pbms dir doesn't exist.
333
 
        path = CSPath::newPath(PBMSDaemon::getPBMSDir());
334
 
        push_(path);
335
 
        if (path->exists()) {
336
 
                startUpReader();
337
 
        }
338
 
        release_(path);
339
 
        
340
 
        exit_();
341
 
}
342
 
 
343
 
void MSTransactionManager::shutDown()
344
 
{
345
 
        if (tm_Reader) {
346
 
                tm_Reader->stop();
347
 
                tm_Reader->release();
348
 
                tm_Reader = NULL;
349
 
        }
350
 
        if (tm_Log) {
351
 
                tm_Log->release();
352
 
                tm_Log = NULL;
353
 
        }
354
 
}
355
 
 
356
 
void MSTransactionManager::flush()
357
 
{
358
 
        if (tm_Reader) 
359
 
                tm_Reader->flush();
360
 
}
361
 
 
362
 
void MSTransactionManager::suspend(bool do_flush)
363
 
{
364
 
        enter_();
365
 
        
366
 
        if (do_flush) 
367
 
                flush();
368
 
                
369
 
        if (tm_Reader) {
370
 
                tm_Reader->suspend();
371
 
        }       
372
 
        exit_();
373
 
}
374
 
 
375
 
void MSTransactionManager::resume()
376
 
{
377
 
        enter_();
378
 
        if (tm_Reader) {
379
 
                tm_Reader->resume();
380
 
        }       
381
 
        exit_();
382
 
}
383
 
 
384
 
void MSTransactionManager::commit()
385
 
{
386
 
        enter_();
387
 
        
388
 
        if (!tm_Log)
389
 
                startUpReader();
390
 
                
391
 
        self->myStmtCount = 0;
392
 
        self->myStartStmt = 0;
393
 
 
394
 
        tm_Log->txn_LogTransaction(MS_CommitTxn);
395
 
        
396
 
 
397
 
        exit_();
398
 
}
399
 
 
400
 
void MSTransactionManager::rollback()
401
 
{
402
 
        enter_();
403
 
        
404
 
        if (!tm_Log)
405
 
                startUpReader();
406
 
                
407
 
        self->myStmtCount = 0;
408
 
        self->myStartStmt = 0;
409
 
 
410
 
        tm_Log->txn_LogTransaction(MS_RollBackTxn);
411
 
 
412
 
        exit_();
413
 
}
414
 
 
415
 
class MSTransactionCheckPoint: public CSString
416
 
{
417
 
        public:
418
 
        MSTransactionCheckPoint(const char *name, uint32_t stmtCount ):CSString(name)
419
 
        {
420
 
                position = stmtCount;
421
 
        }
422
 
        
423
 
        uint32_t position;
424
 
};
425
 
 
426
 
#ifdef DRIZZLED
427
 
void MSTransactionManager::setSavepoint(const char *savePoint)
428
 
{
429
 
        MSTransactionCheckPoint *checkPoint;
430
 
        enter_();
431
 
        
432
 
        new_(checkPoint, MSTransactionCheckPoint(savePoint, self->myStmtCount));
433
 
        
434
 
        push_(checkPoint);
435
 
        self->mySavePoints.add(checkPoint);
436
 
        pop_(checkPoint);
437
 
        
438
 
        exit_();
439
 
}
440
 
 
441
 
void MSTransactionManager::releaseSavepoint(const char *savePoint)
442
 
{
443
 
        MSTransactionCheckPoint *checkPoint;
444
 
        CSString *name;
445
 
        enter_();
446
 
        
447
 
        name = CSString::newString(savePoint);
448
 
        push_(name);
449
 
 
450
 
        checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
451
 
        release_(name);
452
 
        
453
 
        if (checkPoint)                 
454
 
                self->mySavePoints.remove(checkPoint);
455
 
                
456
 
        exit_();
457
 
}
458
 
 
459
 
void MSTransactionManager::rollbackTo(const char *savePoint)
460
 
{
461
 
        MSTransactionCheckPoint *checkPoint;
462
 
        CSString *name;
463
 
        enter_();
464
 
        
465
 
        name = CSString::newString(savePoint);
466
 
        push_(name);
467
 
 
468
 
        checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
469
 
        release_(name);
470
 
        
471
 
        if (checkPoint) {
472
 
                uint32_t position = checkPoint->position;
473
 
                
474
 
                self->mySavePoints.remove(checkPoint);
475
 
                rollbackToPosition(position);
476
 
        }
477
 
                
478
 
        exit_();
479
 
}
480
 
#endif
481
 
 
482
 
void MSTransactionManager::rollbackToPosition(uint32_t position)
483
 
{
484
 
        enter_();
485
 
 
486
 
        ASSERT(self->myStmtCount > position);
487
 
        
488
 
        if (!tm_Log)
489
 
                startUpReader();
490
 
        tm_Log->txn_LogPartialRollBack(position);
491
 
        
492
 
        exit_();
493
 
}
494
 
 
495
 
void MSTransactionManager::dropDatabase(uint32_t db_id)
496
 
{
497
 
        enter_();
498
 
 
499
 
        if (!tm_Log)
500
 
                startUpReader();
501
 
        
502
 
        tm_Log->txn_dropDatabase(db_id);
503
 
 
504
 
        exit_();
505
 
}
506
 
 
507
 
void MSTransactionManager::logTransaction(bool ref, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
508
 
{
509
 
        enter_();
510
 
        
511
 
        if (!tm_Log)
512
 
                startUpReader();
513
 
 
514
 
        if (!self->myTID) {
515
 
                bool autocommit = false;
516
 
                autocommit = ms_is_autocommit();
517
 
#ifndef DRIZZLED
518
 
                if (!autocommit)
519
 
                        pbms_take_part_in_transaction(ms_my_get_thread());
520
 
#endif
521
 
                        
522
 
                self->myIsAutoCommit = autocommit;
523
 
        }
524
 
        
525
 
        // PBMS always explicitly commits
526
 
        tm_Log->txn_LogTransaction((ref)?MS_ReferenceTxn:MS_DereferenceTxn, false /*autocommit*/, db_id, tab_id, blob_id, blob_ref_id);
527
 
 
528
 
        self->myStmtCount++;
529
 
        
530
 
        exit_();
531
 
}
532
 
 
533