~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSThread.h

This patch completes the first step in the splitting of
the XA resource manager API from the storage engine API,
as outlined in the specification here:

http://drizzle.org/wiki/XaStorageEngine

* Splits plugin::StorageEngine into a base StorageEngine
  class and two derived classes, TransactionalStorageEngine
  and XaStorageEngine.  XaStorageEngine derives from
  TransactionalStorageEngine and creates the XA Resource
  Manager API for storage engines.

  - The methods moved from StorageEngine to TransactionalStorageEngine
    include releaseTemporaryLatches(), startConsistentSnapshot(), 
    commit(), rollback(), setSavepoint(), releaseSavepoint(),
    rollbackToSavepoint() and hasTwoPhaseCommit()
  - The methods moved from StorageEngine to XaStorageEngine
    include recover(), commitXid(), rollbackXid(), and prepare()

* Places all static "EngineVector"s into their proper
  namespaces (typedefs belong in header files, not implementation files)
  and places all static methods corresponding
  to either only transactional engines or only XA engines
  into their respective files in /drizzled/plugin/

* Modifies the InnoDB "handler" files to extend plugin::XaStorageEngine
  and not plugin::StorageEngine

The next step, as outlined in the wiki spec page above, is to isolate
the XA Resource Manager API into its own plugin class and modify
plugin::XaStorageEngine to implement plugin::XaResourceManager via
composition.  This is necessary to enable building plugins which can
participate in an XA transaction *without having to have that plugin
implement the entire storage engine API*

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Original author: Paul McCullagh (H&G2JCtL)
20
 
 * Continued development: Barry Leslie
21
 
 *
22
 
 * 2007-05-20
23
 
 *
24
 
 * CORE SYSTEM:
25
 
 * A independently running thread.
26
 
 *
27
 
 */
28
 
 
29
 
#ifndef __CSTHREAD_H__
30
 
#define __CSTHREAD_H__
31
 
 
32
 
#include <pthread.h>
33
 
#include <setjmp.h>
34
 
 
35
 
#include "CSDefs.h"
36
 
#include "CSMutex.h"
37
 
#include "CSException.h"
38
 
#include "CSObject.h"
39
 
#include "CSStorage.h"
40
 
 
41
 
#define CS_THREAD_TYPE                          int
42
 
 
43
 
/* Types of threads: */
44
 
#define CS_ANY_THREAD                           0
45
 
#define CS_THREAD                                       1
46
 
 
47
 
typedef struct CSCallStack {
48
 
        const char*     cs_func;
49
 
        const char*     cs_file;
50
 
        int                     cs_line;
51
 
} CSCallStack, *CSCallStackPtr;
52
 
 
53
 
/* 
54
 
 * The release stack contains objects that need to be
55
 
 * released when an exception occurs.
56
 
 */
57
 
#define CS_RELEASE_OBJECT               1
58
 
#define CS_RELEASE_MUTEX                2
59
 
#define CS_RELEASE_POOLED               3
60
 
#define CS_RELEASE_MEM                  4
61
 
#define CS_RELEASE_OBJECT_PTR   5
62
 
 
63
 
typedef struct CSRelease {
64
 
        int                                             r_type;
65
 
        union {
66
 
                CSObject                        *r_object;                                      /* The object to be released. */
67
 
                CSMutex                         *r_mutex;                                       /* The mutex to be unlocked! */
68
 
                CSPooled                        *r_pooled;
69
 
                void                            *r_mem;
70
 
                CSObject                        **r_objectPtr;
71
 
        } x;
72
 
} CSReleaseRec, *CSReleasePtr;
73
 
 
74
 
typedef struct CSJumpBuf {
75
 
        CSReleasePtr                    jb_res_top;
76
 
        int                                             jb_call_top;
77
 
        jmp_buf                                 jb_buffer;
78
 
} CSJumpBufRec, *CSJumpBufPtr;
79
 
 
80
 
class CSThreadList: public CSLinkedList, public CSMutex {
81
 
public:
82
 
        CSThreadList():
83
 
                CSLinkedList(),
84
 
                CSMutex()
85
 
        {
86
 
        }
87
 
 
88
 
        virtual ~CSThreadList() {
89
 
                stopAllThreads();
90
 
        }
91
 
 
92
 
        /**
93
 
         * Send the given signal to all threads, except to self!
94
 
         */
95
 
        void signalAllThreads(int sig);
96
 
 
97
 
        void quitAllThreads();
98
 
 
99
 
        void stopAllThreads();
100
 
};
101
 
 
102
 
typedef void *(*ThreadRunFunc)();
103
 
 
104
 
class CSThread : public CSRefObject {
105
 
public:
106
 
        /* The name of the thread. */
107
 
        CSString                *threadName;
108
 
        CSThreadList    *myThreadList;                          /* The thread list that this thread belongs to. */
109
 
 
110
 
        /* If this value is non-zero, this signal is pending and
111
 
         * must be thrown.
112
 
         *
113
 
         * SIGTERM, SIGQUIT - Means the thread has been terminated.
114
 
         * SIGINT - Means the thread has been interrupted.
115
 
         *
116
 
         * When a signal is throw it clears this value. This includes
117
 
         * the case when system calls return error due to interrupt.
118
 
         */
119
 
        int                             signalPending;
120
 
        bool                    ignoreSignals;
121
 
 
122
 
        /* Set to true once the thread is running (never reset!). */
123
 
        bool                    isRunning;
124
 
 
125
 
        /* Set to true when the thread must quit (never reset!): */
126
 
        bool                    myMustQuit;     
127
 
        
128
 
        CSException             myException;
129
 
#if defined(MYSQL_SERVER) ||  defined(DRIZZLED)
130
 
 
131
 
        /* Set to true when this tread was initialized through the internal PBMS api. */
132
 
        /* When this is the case than it must only be freed via the API as well. */
133
 
        bool                    pbms_api_owner;
134
 
 
135
 
        /* Transaction references. */
136
 
#ifdef DRIZZLED
137
 
        CSSortedList    mySavePoints;
138
 
#endif
139
 
        uint32_t                myTID;                  // Current transaction ID
140
 
        uint32_t                myTransRef;             // Reference to the current transaction cache index
141
 
        bool                    myIsAutoCommit; // Is the current transaction in auto commit mode.
142
 
        uint32_t                myCacheVersion; // The last transaction cache version checked. Used during overflow.
143
 
        bool                    myStartTxn;             // A flag to indicate the start of a new transaction.
144
 
        uint32_t                myStmtCount;    // Counts the number of statements in the current transaction.
145
 
        uint32_t                myStartStmt;    // The myStmtCount at the start of the last logical statement. (An update is 2 statements but only 1 logical statement.)
146
 
        void                    *myInfo;
147
 
#endif
148
 
        
149
 
        /* The call stack */
150
 
        int                             callTop;
151
 
        CSCallStack             callStack[CS_CALL_STACK_SIZE];
152
 
 
153
 
        /* The long jump stack: */
154
 
        int                             jumpDepth;                                                      /* The current jump depth */
155
 
        CSJumpBufRec    jumpEnv[CS_JUMP_STACK_SIZE];            /* The process environment to be restored on exception */
156
 
 
157
 
        /* The release stack */
158
 
        CSReleasePtr    relTop;                                                         /* The top of the resource stack (reference next free space). */
159
 
        CSReleaseRec    relStack[CS_RELEASE_STACK_SIZE];        /* Temporary data to be freed if an exception occurs. */
160
 
 
161
 
        CSThread(CSThreadList *list):
162
 
                CSRefObject(),
163
 
                threadName(NULL),
164
 
                myThreadList(list),
165
 
                signalPending(0),
166
 
                ignoreSignals(false),
167
 
                isRunning(false),
168
 
                myMustQuit(false),
169
 
#if defined(MYSQL_SERVER) ||  defined(DRIZZLED)
170
 
                pbms_api_owner(false),
171
 
                myTID(0),
172
 
                myTransRef(0),
173
 
                myIsAutoCommit(true),
174
 
                myCacheVersion(0),
175
 
                myStartTxn(true),
176
 
                myStmtCount(0),
177
 
                myStartStmt(0),
178
 
                myInfo(NULL),
179
 
#endif
180
 
                callTop(0),
181
 
                jumpDepth(0),
182
 
                relTop(relStack),
183
 
                iIsMain(false),
184
 
                isDetached(false),
185
 
                iRunFunc(NULL),
186
 
                iNextLink(NULL),
187
 
                iPrevLink(NULL)
188
 
        {
189
 
        }
190
 
 
191
 
        virtual ~CSThread() {
192
 
                if (threadName)
193
 
                        threadName->release();
194
 
        }
195
 
 
196
 
    /**
197
 
     * Task to be performed by this thread.
198
 
         *
199
 
     * @exception CSSystemException thrown if thread cannot be run.
200
 
         */
201
 
        virtual void *run();
202
 
 
203
 
        /**
204
 
         * Start execution of the thread.
205
 
         *
206
 
     * @exception CSSystemException thrown if thread is invalid.
207
 
         */
208
 
        void start(bool detached = false);
209
 
 
210
 
        /*
211
 
         * Stop execution of the thread.
212
 
         */
213
 
        virtual void stop();
214
 
 
215
 
        /**
216
 
         * Wait for this thread to die.
217
 
         *
218
 
     * @exception CSSystemException thrown if thread is invalid.
219
 
         */
220
 
        void *join();
221
 
 
222
 
        /**
223
 
         * Signal the thread. Throws CSSystemException 
224
 
     * if the thread is invalid.
225
 
         */
226
 
        void signal(unsigned int);
227
 
 
228
 
        void setSignalPending(unsigned int);
229
 
 
230
 
        /**
231
 
         * Check to see if we have been interrupted by a signal
232
 
         * (i.e. there is a signal pending).
233
 
         * This function throws CSSignalException if
234
 
         * there is a signal pending.
235
 
         */
236
 
        void interrupted() { if (signalPending) throwSignal(); }
237
 
        void throwSignal();
238
 
 
239
 
        /* Log the stack to the specified depth along with the message. */
240
 
        void logStack(int depth, const char *msg);
241
 
 
242
 
        /* Log the exception, and the current stack. */
243
 
        void logException();
244
 
        
245
 
        /* Log the exception, and the current stack. */
246
 
        void logMessage();
247
 
        
248
 
        /*
249
 
         * Return true if this is the main thread.
250
 
         */
251
 
        bool isMain();
252
 
 
253
 
        /*
254
 
         * Throwing exceptions:
255
 
         */
256
 
        void releaseObjects(CSReleasePtr top);
257
 
        void throwException();
258
 
        void caught();
259
 
        bool isMe(CSThread *me) { return (pthread_equal(me->iThread,iThread) != 0);}
260
 
        /* Make this object linkable: */
261
 
        virtual CSObject *getNextLink() { return iNextLink; }
262
 
        virtual CSObject *getPrevLink() { return iPrevLink; }
263
 
        virtual void setNextLink(CSObject *link) { iNextLink = link; }
264
 
        virtual void setPrevLink(CSObject *link) { iPrevLink = link; }
265
 
 
266
 
        friend class CSDaemon;
267
 
 
268
 
private:
269
 
        pthread_t               iThread;
270
 
        bool                    iIsMain;
271
 
        bool                    isDetached;
272
 
        ThreadRunFunc   iRunFunc;
273
 
        CSObject                *iNextLink;
274
 
        CSObject                *iPrevLink;
275
 
 
276
 
        void addToList();
277
 
        void removeFromList();
278
 
 
279
 
public:
280
 
        /* Each thread stores is thread object in this key: */
281
 
        static pthread_key_t sThreadKey;
282
 
 
283
 
   /**
284
 
     * Put the currently executing thread to sleep for a given amount of
285
 
     * time.
286
 
     *
287
 
     * @param timeout maximum amount of time (milliseconds) this method could block
288
 
     *
289
 
     * @exception TDInterruptedException thrown if the threads sleep is interrupted
290
 
     *            before <i>timeout</i> milliseconds expire.
291
 
     */
292
 
        static void sleep(unsigned long timeout);
293
 
 
294
 
        /* Do static initialization and de-initialization. */
295
 
        static bool isUp;
296
 
        static bool startUp();
297
 
        static void shutDown();
298
 
 
299
 
        /* Attach and detach an already running thread: */
300
 
        static bool attach(CSThread *thread);
301
 
        static void detach(CSThread *thread);
302
 
 
303
 
        /**
304
 
         * Return the thread object of the current
305
 
         * thread.
306
 
         */
307
 
        static CSThread *getSelf();
308
 
        static bool setSelf(CSThread *self);
309
 
 
310
 
        static CSThread *newCSThread();
311
 
        static CSThread *newThread(CSString *name, ThreadRunFunc run_func, CSThreadList *list);
312
 
 
313
 
        /* called for a newly created thread. */
314
 
        static void *dispatch(void *arg);
315
 
 
316
 
};
317
 
 
318
 
class CSDaemon : public CSThread, public CSSync {
319
 
public:
320
 
        time_t                  myWaitTime;                                     /* Wait time in milli-seconds */
321
 
 
322
 
        CSDaemon(time_t wait_time, CSThreadList *list);
323
 
        CSDaemon(CSThreadList *list);
324
 
        virtual ~CSDaemon() { }
325
 
 
326
 
        virtual void *run();
327
 
 
328
 
        /* Return false if startup failed, and the thread must quit. */
329
 
        virtual bool initializeWork() { return true; };
330
 
 
331
 
        /* Return true of the thread should sleep before doing more work. */
332
 
        virtual bool doWork();
333
 
 
334
 
        virtual void *completeWork() { return NULL; };
335
 
 
336
 
        /* Return false if the excpetion is not handled and the thread must quit.
337
 
         * Set must_sleep to true of the thread should pause before doing work
338
 
         * again.
339
 
         */
340
 
        virtual bool handleException();
341
 
 
342
 
        virtual void stop();
343
 
 
344
 
        void wakeup();
345
 
 
346
 
        void suspend();
347
 
 
348
 
        bool isSuspend() { return (iSuspendCount != 0);} // Don't use iSuspended, we are interested in if suspend() was called.
349
 
 
350
 
        void resume();
351
 
 
352
 
        virtual void returnToPool() {
353
 
                resume();
354
 
                release();
355
 
        }
356
 
 
357
 
        void suspended();
358
 
 
359
 
        void suspendedWait();
360
 
 
361
 
        void suspendedWait(time_t milli_sec);
362
 
 
363
 
private:
364
 
        void            try_Run(CSThread *self, const bool must_sleep);
365
 
        bool            iSuspended;
366
 
        uint32_t        iSuspendCount;
367
 
};
368
 
 
369
 
#endif