~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/transaction_ms.cc

  • Committer: Monty Taylor
  • Date: 2008-10-09 22:38:27 UTC
  • mto: This revision was merged to the branch mainline in revision 497.
  • Revision ID: monty@inaugust.com-20081009223827-bc9gvpiplsmvpwyq
Moved test() to its own file.
Made a new function to possibly replace int10_to_str.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
 
 *
19
 
 * Barry Leslie
20
 
 *
21
 
 * 2009-07-09
22
 
 *
23
 
 * H&G2JCtL
24
 
 *
25
 
 * PBMS transaction daemon.
26
 
 *
27
 
 *
28
 
 */
29
 
 
30
 
#include "cslib/CSConfig.h"
31
 
#include <inttypes.h>
32
 
 
33
 
#include "cslib/CSGlobal.h"
34
 
#include "cslib/CSStrUtil.h"
35
 
#include "cslib/CSLog.h"
36
 
 
37
 
#include "defs_ms.h"
38
 
#include "mysql_ms.h"
39
 
#include "open_table_ms.h"
40
 
#include "trans_log_ms.h"
41
 
#include "transaction_ms.h"
42
 
#include "pbmsdaemon_ms.h"
43
 
 
44
 
/*
45
 
 * The pbms_ functions are utility functions supplied by ha_pbms.cc
46
 
 */
47
 
void    pbms_take_part_in_transaction(void *thread);
48
 
 
49
 
MSTrans *MSTransactionManager::tm_Log;
50
 
MSTransactionThread *MSTransactionManager::tm_Reader;
51
 
 
52
 
 
53
 
typedef  struct {
54
 
        CSDiskValue4    lr_time_4;              // The database ID for the operation.
55
 
        CSDiskValue1    lr_state_1;             // The transaction state. 
56
 
        CSDiskValue1    lr_type_1;              // The transaction type. If the first bit is set then the transaction is an autocommit.
57
 
        CSDiskValue4    lr_db_id_4;             // The database ID for the operation.
58
 
        CSDiskValue4    lr_tab_id_4;    // The table ID for the operation.
59
 
        CSDiskValue8    lr_blob_id_8;   // The blob ID for the operation.
60
 
        CSDiskValue8    lr_blob_ref_id_8;// The blob reference id.
61
 
} MSDiskLostRec, *MSDiskLostPtr;
62
 
 
63
 
/*
64
 
 * ---------------------------------------------------------------
65
 
 * The transaction reader thread 
66
 
 */
67
 
 
68
 
class MSTransactionThread : public CSDaemon {
69
 
public:
70
 
        MSTransactionThread(MSTrans *txn_log);
71
 
        
72
 
        virtual ~MSTransactionThread(){} // Do nothing here because 'self' will no longer be valid, use completeWork().
73
 
 
74
 
 
75
 
        void close();
76
 
 
77
 
        virtual bool doWork();
78
 
 
79
 
        virtual void *completeWork();
80
 
        
81
 
        void flush();
82
 
        
83
 
        bool trt_is_ready;
84
 
private:
85
 
        void reportLostReference(MSTransPtr rec, MS_TxnState state);
86
 
        void dereference(MSTransPtr rec, MS_TxnState state);
87
 
        void commitReference(MSTransPtr rec, MS_TxnState state);
88
 
        
89
 
        MSTrans *trt_log;
90
 
        CSFile  *trt_lostLog;
91
 
 
92
 
};
93
 
 
94
 
MSTransactionThread::MSTransactionThread(MSTrans *txn_log):
95
 
CSDaemon(0, NULL),
96
 
trt_is_ready(false),
97
 
trt_log(txn_log),
98
 
trt_lostLog(NULL)
99
 
{
100
 
        trt_log->txn_SetReader(this);
101
 
}
102
 
 
103
 
void MSTransactionThread::close()
104
 
{
105
 
        if (trt_lostLog)
106
 
                trt_lostLog->close();
107
 
}
108
 
 
109
 
void MSTransactionThread::reportLostReference(MSTransPtr rec, MS_TxnState state)
110
 
{
111
 
        MSDiskLostRec lrec;
112
 
        const char *t_txt, *s_txt;
113
 
        char b1[16], b2[16], msg[100];
114
 
        
115
 
        //if (PBMSDaemon::isDaemonState(PBMSDaemon::DaemonStartUp) == true)
116
 
                //return;
117
 
 
118
 
        enter_();
119
 
        
120
 
        switch (state) {
121
 
                case MS_Committed:
122
 
                        s_txt = "Commit";
123
 
                        break;
124
 
                case MS_RolledBack:
125
 
                        s_txt = "RolledBack";
126
 
                        break;
127
 
                case MS_Recovered:
128
 
                        s_txt = "Recovered";
129
 
                        break;
130
 
                case MS_Running:
131
 
                        s_txt = "Running";
132
 
                        break;
133
 
                default:
134
 
                        snprintf(b1, 16, "(%d)?", state);
135
 
                        s_txt = b1;
136
 
        }
137
 
 
138
 
        switch (TRANS_TYPE(rec->tr_type)) {
139
 
                case MS_DereferenceTxn:
140
 
                        t_txt = "Dereference";
141
 
                        break;
142
 
                case MS_ReferenceTxn:
143
 
                        t_txt = "Reference";
144
 
                        break;
145
 
                default:
146
 
                        snprintf(b2, 16, "(%x)?", rec->tr_type);
147
 
                        t_txt = b2;
148
 
        }
149
 
 
150
 
        snprintf(msg, 100, "Lost PBMS record: %s %s db_id: %"PRIu32" tab_id: %"PRIu32" blob_id: %"PRIu64"", s_txt, t_txt, rec->tr_db_id, rec->tr_tab_id, rec->tr_blob_id);
151
 
        CSL.logLine(self, CSLog::Warning, msg);
152
 
 
153
 
        CS_SET_DISK_4(lrec.lr_time_4, time(NULL));
154
 
        CS_SET_DISK_1(lrec.lr_state_1, state);
155
 
        CS_SET_DISK_1(lrec.lr_type_1, rec->tr_type);
156
 
        CS_SET_DISK_4(lrec.lr_db_id_4, rec->tr_db_id);
157
 
        CS_SET_DISK_4(lrec.lr_tab_id_4, rec->tr_tab_id);
158
 
        CS_SET_DISK_8(lrec.lr_blob_id_8, rec->tr_blob_id);
159
 
        CS_SET_DISK_8(lrec.lr_blob_ref_id_8, rec->tr_blob_ref_id);
160
 
        
161
 
        if (!trt_lostLog) {
162
 
                CSPath *path;
163
 
                char *str = cs_strdup(trt_log->txn_GetTXNLogPath());
164
 
                cs_remove_last_name_of_path(str);
165
 
                
166
 
                path = CSPath::newPath(str, "pbms_lost_txn.dat");
167
 
                cs_free(str);
168
 
                
169
 
                trt_lostLog = CSFile::newFile(path);
170
 
                trt_lostLog->open(CSFile::CREATE);
171
 
        }
172
 
        trt_lostLog->write(&lrec, trt_lostLog->getEOF(), sizeof(MSDiskLostRec));
173
 
        trt_lostLog->sync();
174
 
        exit_();
175
 
        
176
 
}
177
 
 
178
 
void MSTransactionThread::dereference(MSTransPtr rec, MS_TxnState state)
179
 
{
180
 
        MSOpenTable             *otab;
181
 
        enter_();
182
 
        
183
 
        try_(a) {
184
 
                otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
185
 
                frompool_(otab);
186
 
                otab->freeReference(rec->tr_blob_id, rec->tr_blob_ref_id);
187
 
                backtopool_(otab);
188
 
        }
189
 
        
190
 
        catch_(a) {
191
 
                reportLostReference(rec, state);
192
 
        }
193
 
        
194
 
        cont_(a);       
195
 
        exit_();
196
 
}
197
 
 
198
 
void MSTransactionThread::commitReference(MSTransPtr rec, MS_TxnState state)
199
 
{
200
 
        MSOpenTable             *otab;
201
 
        enter_();
202
 
        
203
 
        try_(a) {
204
 
                otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
205
 
                frompool_(otab);
206
 
                otab->commitReference(rec->tr_blob_id, rec->tr_blob_ref_id);
207
 
                backtopool_(otab);
208
 
        }
209
 
        
210
 
        catch_(a) {
211
 
                reportLostReference(rec, state);
212
 
        }
213
 
        
214
 
        cont_(a);       
215
 
        exit_();
216
 
}
217
 
 
218
 
void MSTransactionThread::flush()
219
 
{
220
 
        enter_();
221
 
        
222
 
        // For now I just wait until the transaction queue is empty or
223
 
        // the transaction at the head of the queue has not yet been
224
 
        // committed.
225
 
        //
226
 
        // What needs to be done is for the transaction log to scan 
227
 
        // past the non commited transaction to see if there are any
228
 
        // other committed transaction in the log and apply them if found.
229
 
        
230
 
        wakeup(); // Incase the reader is sleeping.
231
 
        while (trt_log->txn_haveNextTransaction() && !isSuspend() && self->myMustQuit)
232
 
                self->sleep(10);                
233
 
        exit_();
234
 
}
235
 
 
236
 
bool MSTransactionThread::doWork()
237
 
{
238
 
        MSTransRec rec = {0,0,0,0,0,0,0};
239
 
        MS_TxnState state;
240
 
        enter_();
241
 
        
242
 
        try_(a) {
243
 
                while (!myMustQuit) {
244
 
                        // This will sleep while waiting for the next 
245
 
                        // completed transaction.
246
 
                        trt_log->txn_GetNextTransaction(&rec, &state); 
247
 
                        if (myMustQuit)
248
 
                                break;
249
 
                                
250
 
                        if (rec.tr_db_id == 0) // The database was dropped.
251
 
                                continue;
252
 
                                
253
 
                        if (state == MS_Committed){
254
 
                                if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn) 
255
 
                                        dereference(&rec, state);
256
 
                                else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)  
257
 
                                        commitReference(&rec, state);
258
 
 
259
 
                        } else if (state == MS_RolledBack) { 
260
 
                                // There is nothing to do on rollback of a dereference.
261
 
                                
262
 
                                if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)  
263
 
                                        dereference(&rec, state);
264
 
                                        
265
 
                        } else if (state == MS_Recovered) { 
266
 
                                if ((TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn))
267
 
                                        reportLostReference(&rec, state); // Report these even though they may not be lost.
268
 
                                
269
 
                                // Because of the 2 phase commit issue with other engines I cannot
270
 
                                // just roll back the transaction because it may have been committed 
271
 
                                // on the master engine. So to be safe I will always err on the side
272
 
                                // of having unreference BLOBs in the repository rather than risking
273
 
                                // deleting a BLOB that was referenced. To this end I will commit references
274
 
                                // while ignoring (rolling back) dereferences.
275
 
                                if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)  
276
 
                                        commitReference(&rec, state);
277
 
                                
278
 
                        }
279
 
                }
280
 
        }
281
 
        
282
 
        catch_(a) {
283
 
                self->logException();
284
 
                CSL.logLine(NULL, CSLog::Error, "!!!!!!!! THE PBMS TRANSACTION LOG READER DIED! !!!!!!!!!!!");
285
 
        }
286
 
        cont_(a);
287
 
        return_(true);
288
 
}
289
 
 
290
 
void *MSTransactionThread::completeWork()
291
 
{
292
 
        close();
293
 
        
294
 
        if (trt_log)
295
 
                trt_log->release();
296
 
                
297
 
        if (trt_lostLog)
298
 
                trt_lostLog->release();
299
 
        return NULL;
300
 
}
301
 
 
302
 
/*
303
 
 * ---------------------------------------------------------------
304
 
 * The transaction log manager 
305
 
 */
306
 
void MSTransactionManager::startUpReader()
307
 
{
308
 
        char pbms_path[PATH_MAX];
309
 
        enter_();
310
 
        
311
 
        cs_strcpy(PATH_MAX, pbms_path, PBMSDaemon::getPBMSDir()); 
312
 
        cs_add_name_to_path(PATH_MAX, pbms_path, "ms-trans-log.dat");
313
 
        
314
 
        tm_Log = MSTrans::txn_NewMSTrans(pbms_path);
315
 
        new_(tm_Reader, MSTransactionThread(RETAIN(tm_Log)));
316
 
 
317
 
        tm_Reader->start();
318
 
        
319
 
        // Wait for the transaction reader to recover any old transaction:
320
 
        tm_Reader->flush();
321
 
                
322
 
        exit_();
323
 
}
324
 
 
325
 
void MSTransactionManager::startUp()
326
 
{
327
 
        CSPath *path = NULL;
328
 
        enter_();
329
 
        
330
 
        // Do not start the reader if the pbms dir doesn't exist.
331
 
        path = CSPath::newPath(PBMSDaemon::getPBMSDir());
332
 
        push_(path);
333
 
        if (path->exists()) {
334
 
                startUpReader();
335
 
        }
336
 
        release_(path);
337
 
        
338
 
        exit_();
339
 
}
340
 
 
341
 
void MSTransactionManager::shutDown()
342
 
{
343
 
        if (tm_Reader) {
344
 
                tm_Reader->stop();
345
 
                tm_Reader->release();
346
 
                tm_Reader = NULL;
347
 
        }
348
 
        if (tm_Log) {
349
 
                tm_Log->release();
350
 
                tm_Log = NULL;
351
 
        }
352
 
}
353
 
 
354
 
void MSTransactionManager::flush()
355
 
{
356
 
        if (tm_Reader) 
357
 
                tm_Reader->flush();
358
 
}
359
 
 
360
 
void MSTransactionManager::suspend(bool do_flush)
361
 
{
362
 
        enter_();
363
 
        
364
 
        if (do_flush) 
365
 
                flush();
366
 
                
367
 
        if (tm_Reader) {
368
 
                tm_Reader->suspend();
369
 
        }       
370
 
        exit_();
371
 
}
372
 
 
373
 
void MSTransactionManager::resume()
374
 
{
375
 
        enter_();
376
 
        if (tm_Reader) {
377
 
                tm_Reader->resume();
378
 
        }       
379
 
        exit_();
380
 
}
381
 
 
382
 
void MSTransactionManager::commit()
383
 
{
384
 
        enter_();
385
 
        
386
 
        if (!tm_Log)
387
 
                startUpReader();
388
 
                
389
 
        self->myStmtCount = 0;
390
 
        self->myStartStmt = 0;
391
 
 
392
 
        tm_Log->txn_LogTransaction(MS_CommitTxn);
393
 
        
394
 
 
395
 
        exit_();
396
 
}
397
 
 
398
 
void MSTransactionManager::rollback()
399
 
{
400
 
        enter_();
401
 
        
402
 
        if (!tm_Log)
403
 
                startUpReader();
404
 
                
405
 
        self->myStmtCount = 0;
406
 
        self->myStartStmt = 0;
407
 
 
408
 
        tm_Log->txn_LogTransaction(MS_RollBackTxn);
409
 
 
410
 
        exit_();
411
 
}
412
 
 
413
 
class MSTransactionCheckPoint: public CSCString
414
 
{
415
 
        public:
416
 
        MSTransactionCheckPoint(const char *name, uint32_t stmtCount ):CSCString()
417
 
        {
418
 
                myCString = cs_strdup(name);
419
 
                myStrLen = strlen(name);
420
 
 
421
 
                position = stmtCount;
422
 
        }
423
 
        
424
 
        uint32_t position;
425
 
};
426
 
 
427
 
#ifdef DRIZZLED
428
 
void MSTransactionManager::setSavepoint(const char *savePoint)
429
 
{
430
 
        MSTransactionCheckPoint *checkPoint;
431
 
        enter_();
432
 
        
433
 
        new_(checkPoint, MSTransactionCheckPoint(savePoint, self->myStmtCount));
434
 
        
435
 
        push_(checkPoint);
436
 
        self->mySavePoints.add(checkPoint);
437
 
        pop_(checkPoint);
438
 
        
439
 
        exit_();
440
 
}
441
 
 
442
 
void MSTransactionManager::releaseSavepoint(const char *savePoint)
443
 
{
444
 
        MSTransactionCheckPoint *checkPoint;
445
 
        CSCString *name;
446
 
        enter_();
447
 
        
448
 
        name = CSCString::newString(savePoint);
449
 
        push_(name);
450
 
 
451
 
        checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
452
 
        release_(name);
453
 
        
454
 
        if (checkPoint)                 
455
 
                self->mySavePoints.remove(checkPoint);
456
 
                
457
 
        exit_();
458
 
}
459
 
 
460
 
void MSTransactionManager::rollbackTo(const char *savePoint)
461
 
{
462
 
        MSTransactionCheckPoint *checkPoint;
463
 
        CSCString *name;
464
 
        enter_();
465
 
        
466
 
        name = CSCString::newString(savePoint);
467
 
        push_(name);
468
 
 
469
 
        checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
470
 
        release_(name);
471
 
        
472
 
        if (checkPoint) {
473
 
                uint32_t position = checkPoint->position;
474
 
                
475
 
                self->mySavePoints.remove(checkPoint);
476
 
                rollbackToPosition(position);
477
 
        }
478
 
                
479
 
        exit_();
480
 
}
481
 
#endif
482
 
 
483
 
void MSTransactionManager::rollbackToPosition(uint32_t position)
484
 
{
485
 
        enter_();
486
 
 
487
 
        ASSERT(self->myStmtCount > position);
488
 
        
489
 
        if (!tm_Log)
490
 
                startUpReader();
491
 
        tm_Log->txn_LogPartialRollBack(position);
492
 
        
493
 
        exit_();
494
 
}
495
 
 
496
 
void MSTransactionManager::dropDatabase(uint32_t db_id)
497
 
{
498
 
        enter_();
499
 
 
500
 
        if (!tm_Log)
501
 
                startUpReader();
502
 
        
503
 
        tm_Log->txn_dropDatabase(db_id);
504
 
 
505
 
        exit_();
506
 
}
507
 
 
508
 
void MSTransactionManager::logTransaction(bool ref, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
509
 
{
510
 
        enter_();
511
 
        
512
 
        if (!tm_Log)
513
 
                startUpReader();
514
 
 
515
 
        if (!self->myTID) {
516
 
                bool autocommit = false;
517
 
                autocommit = ms_is_autocommit();
518
 
#ifndef DRIZZLED
519
 
                if (!autocommit)
520
 
                        pbms_take_part_in_transaction(ms_my_get_thread());
521
 
#endif
522
 
                        
523
 
                self->myIsAutoCommit = autocommit;
524
 
        }
525
 
        
526
 
        // PBMS always explicitly commits
527
 
        tm_Log->txn_LogTransaction((ref)?MS_ReferenceTxn:MS_DereferenceTxn, false /*autocommit*/, db_id, tab_id, blob_id, blob_ref_id);
528
 
 
529
 
        self->myStmtCount++;
530
 
        
531
 
        exit_();
532
 
}
533
 
 
534