~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSThread.h

  • Committer: Brian Aker
  • Date: 2010-12-08 22:35:56 UTC
  • mfrom: (1819.9.158 update-innobase)
  • Revision ID: brian@tangent.org-20101208223556-37mi4omqg7lkjzf3
Merge in Stewart's changes, 1.3 changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
 
18
 *
 
19
 * Original author: Paul McCullagh (H&G2JCtL)
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-05-20
 
23
 *
 
24
 * CORE SYSTEM:
 
25
 * A independently running thread.
 
26
 *
 
27
 */
 
28
 
 
29
#ifndef __CSTHREAD_H__
 
30
#define __CSTHREAD_H__
 
31
 
 
32
#include <pthread.h>
 
33
#include <setjmp.h>
 
34
 
 
35
#include "CSDefs.h"
 
36
#include "CSMutex.h"
 
37
#include "CSException.h"
 
38
#include "CSObject.h"
 
39
#include "CSStorage.h"
 
40
 
 
41
#define CS_THREAD_TYPE                          int
 
42
 
 
43
/* Types of threads: */
 
44
#define CS_ANY_THREAD                           0
 
45
#define CS_THREAD                                       1
 
46
 
 
47
typedef struct CSCallStack {
 
48
        const char*     cs_func;
 
49
        const char*     cs_file;
 
50
        int                     cs_line;
 
51
} CSCallStack, *CSCallStackPtr;
 
52
 
 
53
/* 
 
54
 * The release stack contains objects that need to be
 
55
 * released when an exception occurs.
 
56
 */
 
57
#define CS_RELEASE_OBJECT               1
 
58
#define CS_RELEASE_MUTEX                2
 
59
#define CS_RELEASE_POOLED               3
 
60
#define CS_RELEASE_MEM                  4
 
61
#define CS_RELEASE_OBJECT_PTR   5
 
62
 
 
63
typedef struct CSRelease {
 
64
        int                                             r_type;
 
65
        union {
 
66
                CSObject                        *r_object;                                      /* The object to be released. */
 
67
                CSMutex                         *r_mutex;                                       /* The mutex to be unlocked! */
 
68
                CSPooled                        *r_pooled;
 
69
                void                            *r_mem;
 
70
                CSObject                        **r_objectPtr;
 
71
        } x;
 
72
} CSReleaseRec, *CSReleasePtr;
 
73
 
 
74
typedef struct CSJumpBuf {
 
75
        CSReleasePtr                    jb_res_top;
 
76
        int                                             jb_call_top;
 
77
        jmp_buf                                 jb_buffer;
 
78
} CSJumpBufRec, *CSJumpBufPtr;
 
79
 
 
80
class CSThreadList: public CSLinkedList, public CSMutex {
 
81
public:
 
82
        CSThreadList():
 
83
                CSLinkedList(),
 
84
                CSMutex()
 
85
        {
 
86
        }
 
87
 
 
88
        virtual ~CSThreadList() {
 
89
                stopAllThreads();
 
90
        }
 
91
 
 
92
        /**
 
93
         * Send the given signal to all threads, except to self!
 
94
         */
 
95
        void signalAllThreads(int sig);
 
96
 
 
97
        void quitAllThreads();
 
98
 
 
99
        void stopAllThreads();
 
100
};
 
101
 
 
102
typedef void *(*ThreadRunFunc)();
 
103
 
 
104
class CSThread : public CSRefObject {
 
105
public:
 
106
        /* The name of the thread. */
 
107
        CSString                *threadName;
 
108
        CSThreadList    *myThreadList;                          /* The thread list that this thread belongs to. */
 
109
 
 
110
        /* If this value is non-zero, this signal is pending and
 
111
         * must be thrown.
 
112
         *
 
113
         * SIGTERM, SIGQUIT - Means the thread has been terminated.
 
114
         * SIGINT - Means the thread has been interrupted.
 
115
         *
 
116
         * When a signal is throw it clears this value. This includes
 
117
         * the case when system calls return error due to interrupt.
 
118
         */
 
119
        int                             signalPending;
 
120
        bool                    ignoreSignals;
 
121
 
 
122
        /* Set to true once the thread is running (never reset!). */
 
123
        bool                    isRunning;
 
124
 
 
125
        /* Set to true when the thread must quit (never reset!): */
 
126
        bool                    myMustQuit;     
 
127
        
 
128
        CSException             myException;
 
129
#if defined(MYSQL_SERVER) ||  defined(DRIZZLED)
 
130
 
 
131
        /* Set to true when this tread was initialized through the internal PBMS api. */
 
132
        /* When this is the case than it must only be freed via the API as well. */
 
133
        bool                    pbms_api_owner;
 
134
 
 
135
        /* Transaction references. */
 
136
#ifdef DRIZZLED
 
137
        CSSortedList    mySavePoints;
 
138
#endif
 
139
        uint32_t                myTID;                  // Current transaction ID
 
140
        uint32_t                myTransRef;             // Reference to the current transaction cache index
 
141
        bool                    myIsAutoCommit; // Is the current transaction in auto commit mode.
 
142
        uint32_t                myCacheVersion; // The last transaction cache version checked. Used during overflow.
 
143
        bool                    myStartTxn;             // A flag to indicate the start of a new transaction.
 
144
        uint32_t                myStmtCount;    // Counts the number of statements in the current transaction.
 
145
        uint32_t                myStartStmt;    // The myStmtCount at the start of the last logical statement. (An update is 2 statements but only 1 logical statement.)
 
146
        void                    *myInfo;
 
147
#endif
 
148
        
 
149
        /* The call stack */
 
150
        int                             callTop;
 
151
        CSCallStack             callStack[CS_CALL_STACK_SIZE];
 
152
 
 
153
        /* The long jump stack: */
 
154
        int                             jumpDepth;                                                      /* The current jump depth */
 
155
        CSJumpBufRec    jumpEnv[CS_JUMP_STACK_SIZE];            /* The process environment to be restored on exception */
 
156
 
 
157
        /* The release stack */
 
158
        CSReleasePtr    relTop;                                                         /* The top of the resource stack (reference next free space). */
 
159
        CSReleaseRec    relStack[CS_RELEASE_STACK_SIZE];        /* Temporary data to be freed if an exception occurs. */
 
160
 
 
161
        CSThread(CSThreadList *list):
 
162
                CSRefObject(),
 
163
                threadName(NULL),
 
164
                myThreadList(list),
 
165
                signalPending(0),
 
166
                ignoreSignals(false),
 
167
                isRunning(false),
 
168
                myMustQuit(false),
 
169
#if defined(MYSQL_SERVER) ||  defined(DRIZZLED)
 
170
                pbms_api_owner(false),
 
171
                myTID(0),
 
172
                myTransRef(0),
 
173
                myIsAutoCommit(true),
 
174
                myCacheVersion(0),
 
175
                myStartTxn(true),
 
176
                myStmtCount(0),
 
177
                myStartStmt(0),
 
178
                myInfo(NULL),
 
179
#endif
 
180
                callTop(0),
 
181
                jumpDepth(0),
 
182
                relTop(relStack),
 
183
                iIsMain(false),
 
184
                isDetached(false),
 
185
                iRunFunc(NULL),
 
186
                iNextLink(NULL),
 
187
                iPrevLink(NULL)
 
188
        {
 
189
        }
 
190
 
 
191
        virtual ~CSThread() {
 
192
                if (threadName)
 
193
                        threadName->release();
 
194
        }
 
195
 
 
196
    /**
 
197
     * Task to be performed by this thread.
 
198
         *
 
199
     * @exception CSSystemException thrown if thread cannot be run.
 
200
         */
 
201
        virtual void *run();
 
202
 
 
203
        /**
 
204
         * Start execution of the thread.
 
205
         *
 
206
     * @exception CSSystemException thrown if thread is invalid.
 
207
         */
 
208
        void start(bool detached = false);
 
209
 
 
210
        /*
 
211
         * Stop execution of the thread.
 
212
         */
 
213
        virtual void stop();
 
214
 
 
215
        /**
 
216
         * Wait for this thread to die.
 
217
         *
 
218
     * @exception CSSystemException thrown if thread is invalid.
 
219
         */
 
220
        void *join();
 
221
 
 
222
        /**
 
223
         * Signal the thread. Throws CSSystemException 
 
224
     * if the thread is invalid.
 
225
         */
 
226
        void signal(unsigned int);
 
227
 
 
228
        void setSignalPending(unsigned int);
 
229
 
 
230
        /**
 
231
         * Check to see if we have been interrupted by a signal
 
232
         * (i.e. there is a signal pending).
 
233
         * This function throws CSSignalException if
 
234
         * there is a signal pending.
 
235
         */
 
236
        void interrupted() { if (signalPending) throwSignal(); }
 
237
        void throwSignal();
 
238
 
 
239
        /* Log the stack to the specified depth along with the message. */
 
240
        void logStack(int depth, const char *msg);
 
241
 
 
242
        /* Log the exception, and the current stack. */
 
243
        void logException();
 
244
        
 
245
        /* Log the exception, and the current stack. */
 
246
        void logMessage();
 
247
        
 
248
        /*
 
249
         * Return true if this is the main thread.
 
250
         */
 
251
        bool isMain();
 
252
 
 
253
        /*
 
254
         * Throwing exceptions:
 
255
         */
 
256
        void releaseObjects(CSReleasePtr top);
 
257
        void throwException();
 
258
        void caught();
 
259
        bool isMe(CSThread *me) { return (pthread_equal(me->iThread,iThread) != 0);}
 
260
        /* Make this object linkable: */
 
261
        virtual CSObject *getNextLink() { return iNextLink; }
 
262
        virtual CSObject *getPrevLink() { return iPrevLink; }
 
263
        virtual void setNextLink(CSObject *link) { iNextLink = link; }
 
264
        virtual void setPrevLink(CSObject *link) { iPrevLink = link; }
 
265
 
 
266
        friend class CSDaemon;
 
267
 
 
268
private:
 
269
        pthread_t               iThread;
 
270
        bool                    iIsMain;
 
271
        bool                    isDetached;
 
272
        ThreadRunFunc   iRunFunc;
 
273
        CSObject                *iNextLink;
 
274
        CSObject                *iPrevLink;
 
275
 
 
276
        void addToList();
 
277
        void removeFromList();
 
278
 
 
279
public:
 
280
        /* Each thread stores is thread object in this key: */
 
281
        static pthread_key_t sThreadKey;
 
282
 
 
283
   /**
 
284
     * Put the currently executing thread to sleep for a given amount of
 
285
     * time.
 
286
     *
 
287
     * @param timeout maximum amount of time (milliseconds) this method could block
 
288
     *
 
289
     * @exception TDInterruptedException thrown if the threads sleep is interrupted
 
290
     *            before <i>timeout</i> milliseconds expire.
 
291
     */
 
292
        static void sleep(unsigned long timeout);
 
293
 
 
294
        /* Do static initialization and de-initialization. */
 
295
        static bool isUp;
 
296
        static bool startUp();
 
297
        static void shutDown();
 
298
 
 
299
        /* Attach and detach an already running thread: */
 
300
        static bool attach(CSThread *thread);
 
301
        static void detach(CSThread *thread);
 
302
 
 
303
        /**
 
304
         * Return the thread object of the current
 
305
         * thread.
 
306
         */
 
307
        static CSThread *getSelf();
 
308
        static bool setSelf(CSThread *self);
 
309
 
 
310
        static CSThread *newCSThread();
 
311
        static CSThread *newThread(CSString *name, ThreadRunFunc run_func, CSThreadList *list);
 
312
 
 
313
        /* called for a newly created thread. */
 
314
        static void *dispatch(void *arg);
 
315
 
 
316
};
 
317
 
 
318
class CSDaemon : public CSThread, public CSSync {
 
319
public:
 
320
        time_t                  myWaitTime;                                     /* Wait time in milli-seconds */
 
321
 
 
322
        CSDaemon(time_t wait_time, CSThreadList *list);
 
323
        CSDaemon(CSThreadList *list);
 
324
        virtual ~CSDaemon() { }
 
325
 
 
326
        virtual void *run();
 
327
 
 
328
        /* Return false if startup failed, and the thread must quit. */
 
329
        virtual bool initializeWork() { return true; };
 
330
 
 
331
        /* Return true of the thread should sleep before doing more work. */
 
332
        virtual bool doWork();
 
333
 
 
334
        virtual void *completeWork() { return NULL; };
 
335
 
 
336
        /* Return false if the excpetion is not handled and the thread must quit.
 
337
         * Set must_sleep to true of the thread should pause before doing work
 
338
         * again.
 
339
         */
 
340
        virtual bool handleException();
 
341
 
 
342
        virtual void stop();
 
343
 
 
344
        void wakeup();
 
345
 
 
346
        void suspend();
 
347
 
 
348
        bool isSuspend() { return (iSuspendCount != 0);} // Don't use iSuspended, we are interested in if suspend() was called.
 
349
 
 
350
        void resume();
 
351
 
 
352
        virtual void returnToPool() {
 
353
                resume();
 
354
                release();
 
355
        }
 
356
 
 
357
        void suspended();
 
358
 
 
359
        void suspendedWait();
 
360
 
 
361
        void suspendedWait(time_t milli_sec);
 
362
 
 
363
private:
 
364
        void            try_Run(CSThread *self, const bool must_sleep);
 
365
        bool            iSuspended;
 
366
        uint32_t        iSuspendCount;
 
367
};
 
368
 
 
369
#endif