~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSThread.h

  • Committer: Brian Aker
  • Date: 2008-12-15 19:32:58 UTC
  • mfrom: (677.1.2 devel)
  • Revision ID: brian@tangent.org-20081215193258-fsvc1sh9h7a9sb1t
Merge from Monty

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Original author: Paul McCullagh (H&G2JCtL)
20
 
 * Continued development: Barry Leslie
21
 
 *
22
 
 * 2007-05-20
23
 
 *
24
 
 * CORE SYSTEM:
25
 
 * A independently running thread.
26
 
 *
27
 
 */
28
 
 
29
 
#ifndef __CSTHREAD_H__
30
 
#define __CSTHREAD_H__
31
 
 
32
 
#include <pthread.h>
33
 
#include <setjmp.h>
34
 
 
35
 
#include "CSDefs.h"
36
 
#include "CSMutex.h"
37
 
#include "CSException.h"
38
 
#include "CSObject.h"
39
 
#include "CSStorage.h"
40
 
 
41
 
#define CS_THREAD_TYPE                          int
42
 
 
43
 
/* Types of threads: */
44
 
#define CS_ANY_THREAD                           0
45
 
#define CS_THREAD                                       1
46
 
 
47
 
typedef struct CSCallStack {
48
 
        const char*     cs_func;
49
 
        const char*     cs_file;
50
 
        int                     cs_line;
51
 
} CSCallStack, *CSCallStackPtr;
52
 
 
53
 
/* 
54
 
 * The release stack contains objects that need to be
55
 
 * released when an exception occurs.
56
 
 */
57
 
#define CS_RELEASE_OBJECT               1
58
 
#define CS_RELEASE_MUTEX                2
59
 
#define CS_RELEASE_POOLED               3
60
 
#define CS_RELEASE_MEM                  4
61
 
#define CS_RELEASE_OBJECT_PTR   5
62
 
 
63
 
typedef struct CSRelease {
64
 
        int                                             r_type;
65
 
        union {
66
 
                CSObject                        *r_object;                                      /* The object to be released. */
67
 
                CSMutex                         *r_mutex;                                       /* The mutex to be unlocked! */
68
 
                CSPooled                        *r_pooled;
69
 
                void                            *r_mem;
70
 
                CSObject                        **r_objectPtr;
71
 
        } x;
72
 
} CSReleaseRec, *CSReleasePtr;
73
 
 
74
 
typedef struct CSJumpBuf {
75
 
        CSReleasePtr                    jb_res_top;
76
 
        int                                             jb_call_top;
77
 
        jmp_buf                                 jb_buffer;
78
 
} CSJumpBufRec, *CSJumpBufPtr;
79
 
 
80
 
class CSThreadList: public CSLinkedList, public CSMutex {
81
 
public:
82
 
        CSThreadList():
83
 
                CSLinkedList(),
84
 
                CSMutex()
85
 
        {
86
 
        }
87
 
 
88
 
        virtual ~CSThreadList() {
89
 
                stopAllThreads();
90
 
        }
91
 
 
92
 
        /**
93
 
         * Send the given signal to all threads, except to self!
94
 
         */
95
 
        void signalAllThreads(int sig);
96
 
 
97
 
        void quitAllThreads();
98
 
 
99
 
        void stopAllThreads();
100
 
};
101
 
 
102
 
typedef void *(*ThreadRunFunc)();
103
 
 
104
 
class CSThread : public CSRefObject {
105
 
public:
106
 
        /* The name of the thread. */
107
 
        CSString                *threadName;
108
 
        CSThreadList    *myThreadList;                          /* The thread list that this thread belongs to. */
109
 
 
110
 
        /* If this value is non-zero, this signal is pending and
111
 
         * must be thrown.
112
 
         *
113
 
         * SIGTERM, SIGQUIT - Means the thread has been terminated.
114
 
         * SIGINT - Means the thread has been interrupted.
115
 
         *
116
 
         * When a signal is throw it clears this value. This includes
117
 
         * the case when system calls return error due to interrupt.
118
 
         */
119
 
        int                             signalPending;
120
 
        bool                    ignoreSignals;
121
 
 
122
 
        /* Set to true once the thread is running (never reset!). */
123
 
        bool                    isRunning;
124
 
 
125
 
        /* Set to true when the thread must quit (never reset!): */
126
 
        bool                    myMustQuit;     
127
 
        
128
 
        CSException             myException;
129
 
#if defined(MYSQL_SERVER) ||  defined(DRIZZLED)
130
 
 
131
 
        /* Set to true when this tread was initialized through the internal PBMS api. */
132
 
        /* When this is the case than it must only be freed via the API as well. */
133
 
        bool                    pbms_api_owner;
134
 
 
135
 
        /* Transaction references. */
136
 
#ifdef DRIZZLED
137
 
        CSSortedList    mySavePoints;
138
 
#endif
139
 
        uint32_t                myTID;                  // Current transaction ID
140
 
        uint32_t                myTransRef;             // Reference to the current transaction cache index
141
 
        bool                    myIsAutoCommit; // Is the current transaction in auto commit mode.
142
 
        uint32_t                myCacheVersion; // The last transaction cache version checked. Used during overflow.
143
 
        bool                    myStartTxn;             // A flag to indicate the start of a new transaction.
144
 
        uint32_t                myStmtCount;    // Counts the number of statements in the current transaction.
145
 
        uint32_t                myStartStmt;    // The myStmtCount at the start of the last logical statement. (An update is 2 statements but only 1 logical statement.)
146
 
        void                    *myInfo;
147
 
#endif
148
 
        
149
 
        /* The call stack */
150
 
        int                             callTop;
151
 
        CSCallStack             callStack[CS_CALL_STACK_SIZE];
152
 
 
153
 
        /* The long jump stack: */
154
 
        int                             jumpDepth;                                                      /* The current jump depth */
155
 
        CSJumpBufRec    jumpEnv[CS_JUMP_STACK_SIZE];            /* The process environment to be restored on exception */
156
 
 
157
 
        /* The release stack */
158
 
        CSReleasePtr    relTop;                                                         /* The top of the resource stack (reference next free space). */
159
 
        CSReleaseRec    relStack[CS_RELEASE_STACK_SIZE];        /* Temporary data to be freed if an exception occurs. */
160
 
 
161
 
        CSThread(CSThreadList *list):
162
 
                CSRefObject(),
163
 
                threadName(NULL),
164
 
                myThreadList(list),
165
 
                signalPending(0),
166
 
                ignoreSignals(false),
167
 
                isRunning(false),
168
 
                myMustQuit(false),
169
 
#if defined(MYSQL_SERVER) ||  defined(DRIZZLED)
170
 
                pbms_api_owner(false),
171
 
                myTID(0),
172
 
                myTransRef(0),
173
 
                myIsAutoCommit(true),
174
 
                myCacheVersion(0),
175
 
                myStartTxn(true),
176
 
                myStmtCount(0),
177
 
                myStartStmt(0),
178
 
                myInfo(NULL),
179
 
#endif
180
 
                callTop(0),
181
 
                jumpDepth(0),
182
 
                relTop(relStack),
183
 
                iIsMain(false),
184
 
                isDetached(false),
185
 
                iRunFunc(NULL),
186
 
                iNextLink(NULL),
187
 
                iPrevLink(NULL)
188
 
        {
189
 
        }
190
 
 
191
 
        virtual ~CSThread() {
192
 
                if (threadName)
193
 
                        threadName->release();
194
 
        }
195
 
 
196
 
    /**
197
 
     * Task to be performed by this thread.
198
 
         *
199
 
     * @exception CSSystemException thrown if thread cannot be run.
200
 
         */
201
 
        virtual void *run();
202
 
 
203
 
        /**
204
 
         * Start execution of the thread.
205
 
         *
206
 
     * @exception CSSystemException thrown if thread is invalid.
207
 
         */
208
 
        void start(bool detached = false);
209
 
 
210
 
        /*
211
 
         * Stop execution of the thread.
212
 
         */
213
 
        virtual void stop();
214
 
 
215
 
        /**
216
 
         * Wait for this thread to die.
217
 
         *
218
 
     * @exception CSSystemException thrown if thread is invalid.
219
 
         */
220
 
        void *join();
221
 
 
222
 
        /**
223
 
         * Signal the thread. Throws CSSystemException 
224
 
     * if the thread is invalid.
225
 
         */
226
 
        void signal(unsigned int);
227
 
 
228
 
        void setSignalPending(unsigned int);
229
 
 
230
 
        /**
231
 
         * Check to see if we have been interrupted by a signal
232
 
         * (i.e. there is a signal pending).
233
 
         * This function throws CSSignalException if
234
 
         * there is a signal pending.
235
 
         */
236
 
        void interrupted() { if (signalPending) throwSignal(); }
237
 
        void throwSignal();
238
 
 
239
 
        /* Log the stack to the specified depth along with the message. */
240
 
        void logStack(int depth, const char *msg);
241
 
 
242
 
        /* Log the exception, and the current stack. */
243
 
        void logException();
244
 
        
245
 
        /* Log the exception, and the current stack. */
246
 
        void logMessage();
247
 
        
248
 
        /*
249
 
         * Return true if this is the main thread.
250
 
         */
251
 
        bool isMain();
252
 
 
253
 
        /*
254
 
         * Throwing exceptions:
255
 
         */
256
 
        void releaseObjects(CSReleasePtr top);
257
 
        void throwException();
258
 
        void caught();
259
 
        bool isMe(CSThread *me) { return (pthread_equal(me->iThread,iThread) != 0);}
260
 
        /* Make this object linkable: */
261
 
        virtual CSObject *getNextLink() { return iNextLink; }
262
 
        virtual CSObject *getPrevLink() { return iPrevLink; }
263
 
        virtual void setNextLink(CSObject *link) { iNextLink = link; }
264
 
        virtual void setPrevLink(CSObject *link) { iPrevLink = link; }
265
 
 
266
 
        friend class CSDaemon;
267
 
 
268
 
private:
269
 
        pthread_t               iThread;
270
 
        bool                    iIsMain;
271
 
        bool                    isDetached;
272
 
        ThreadRunFunc   iRunFunc;
273
 
        CSObject                *iNextLink;
274
 
        CSObject                *iPrevLink;
275
 
 
276
 
        void addToList();
277
 
        void removeFromList();
278
 
 
279
 
public:
280
 
        /* Each thread stores is thread object in this key: */
281
 
        static pthread_key_t sThreadKey;
282
 
 
283
 
   /**
284
 
     * Put the currently executing thread to sleep for a given amount of
285
 
     * time.
286
 
     *
287
 
     * @param timeout maximum amount of time (milliseconds) this method could block
288
 
     *
289
 
     * @exception TDInterruptedException thrown if the threads sleep is interrupted
290
 
     *            before <i>timeout</i> milliseconds expire.
291
 
     */
292
 
        static void sleep(unsigned long timeout);
293
 
 
294
 
        /* Do static initialization and de-initialization. */
295
 
        static bool isUp;
296
 
        static bool startUp();
297
 
        static void shutDown();
298
 
 
299
 
        /* Attach and detach an already running thread: */
300
 
        static bool attach(CSThread *thread);
301
 
        static void detach(CSThread *thread);
302
 
 
303
 
        /**
304
 
         * Return the thread object of the current
305
 
         * thread.
306
 
         */
307
 
        static CSThread *getSelf();
308
 
        static bool setSelf(CSThread *self);
309
 
 
310
 
        static CSThread *newCSThread();
311
 
        static CSThread *newThread(CSString *name, ThreadRunFunc run_func, CSThreadList *list);
312
 
 
313
 
        /* called for a newly created thread. */
314
 
        static void *dispatch(void *arg);
315
 
 
316
 
};
317
 
 
318
 
class CSDaemon : public CSThread, public CSSync {
319
 
public:
320
 
        time_t                  myWaitTime;                                     /* Wait time in milli-seconds */
321
 
 
322
 
        CSDaemon(time_t wait_time, CSThreadList *list);
323
 
        CSDaemon(CSThreadList *list);
324
 
        virtual ~CSDaemon() { }
325
 
 
326
 
        virtual void *run();
327
 
 
328
 
        /* Return false if startup failed, and the thread must quit. */
329
 
        virtual bool initializeWork() { return true; };
330
 
 
331
 
        /* Return true of the thread should sleep before doing more work. */
332
 
        virtual bool doWork();
333
 
 
334
 
        virtual void *completeWork() { return NULL; };
335
 
 
336
 
        /* Return false if the excpetion is not handled and the thread must quit.
337
 
         * Set must_sleep to true of the thread should pause before doing work
338
 
         * again.
339
 
         */
340
 
        virtual bool handleException();
341
 
 
342
 
        virtual void stop();
343
 
 
344
 
        void wakeup();
345
 
 
346
 
        void suspend();
347
 
 
348
 
        bool isSuspend() { return (iSuspendCount != 0);} // Don't use iSuspended, we are interested in if suspend() was called.
349
 
 
350
 
        void resume();
351
 
 
352
 
        virtual void returnToPool() {
353
 
                resume();
354
 
                release();
355
 
        }
356
 
 
357
 
        void suspended();
358
 
 
359
 
        void suspendedWait();
360
 
 
361
 
        void suspendedWait(time_t milli_sec);
362
 
 
363
 
private:
364
 
        void            try_Run(CSThread *self, const bool must_sleep);
365
 
        bool            iSuspended;
366
 
        uint32_t        iSuspendCount;
367
 
};
368
 
 
369
 
#endif