~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSThread.h

Added the PBMS daemon plugin.

(Augen zu und durch!)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Original author: Paul McCullagh (H&G2JCtL)
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-05-20
 
23
 *
 
24
 * CORE SYSTEM:
 
25
 * A independently running thread.
 
26
 *
 
27
 */
 
28
 
 
29
#ifndef __CSTHREAD_H__
 
30
#define __CSTHREAD_H__
 
31
 
 
32
#include <pthread.h>
 
33
#include <setjmp.h>
 
34
 
 
35
#include "CSDefs.h"
 
36
#include "CSMutex.h"
 
37
#include "CSException.h"
 
38
#include "CSStorage.h"
 
39
 
 
40
using namespace std;
 
41
 
 
42
#define CS_THREAD_TYPE                          int
 
43
 
 
44
/* Types of threads: */
 
45
#define CS_ANY_THREAD                           0
 
46
#define CS_THREAD                                       1
 
47
 
 
48
typedef struct CSCallStack {
 
49
        const char*     cs_func;
 
50
        const char*     cs_file;
 
51
        int                     cs_line;
 
52
} CSCallStack, *CSCallStackPtr;
 
53
 
 
54
/* 
 
55
 * The release stack contains objects that need to be
 
56
 * released when an exception occurs.
 
57
 */
 
58
#define CS_RELEASE_OBJECT               1
 
59
#define CS_RELEASE_MUTEX                2
 
60
#define CS_RELEASE_POOLED               3
 
61
 
 
62
typedef struct CSRelease {
 
63
        int                                             r_type;
 
64
        union {
 
65
                CSObject                        *r_object;                                      /* The object to be released. */
 
66
                CSMutex                         *r_mutex;                                       /* The mutex to be unlocked! */
 
67
                CSPooled                        *r_pooled;
 
68
        } x;
 
69
} CSReleaseRec, *CSReleasePtr;
 
70
 
 
71
typedef struct CSJumpBuf {
 
72
        CSReleasePtr                    jb_res_top;
 
73
        int                                             jb_call_top;
 
74
        jmp_buf                                 jb_buffer;
 
75
} CSJumpBufRec, *CSJumpBufPtr;
 
76
 
 
77
class CSThreadList: public CSLinkedList, public CSMutex {
 
78
public:
 
79
        CSThreadList():
 
80
                CSLinkedList(),
 
81
                CSMutex()
 
82
        {
 
83
        }
 
84
 
 
85
        virtual ~CSThreadList() {
 
86
                stopAllThreads();
 
87
        }
 
88
 
 
89
        /**
 
90
         * Send the given signal to all threads, except to self!
 
91
         */
 
92
        void signalAllThreads(int sig);
 
93
 
 
94
        void quitAllThreads();
 
95
 
 
96
        void stopAllThreads();
 
97
};
 
98
 
 
99
typedef void *(*ThreadRunFunc)();
 
100
 
 
101
class CSThread : public CSRefObject {
 
102
public:
 
103
        /* The name of the thread. */
 
104
        CSString                *threadName;
 
105
        CSThreadList    *myThreadList;                          /* The thread list that this thread belongs to. */
 
106
 
 
107
        /* If this value is non-zero, this signal is pending and
 
108
         * must be thrown.
 
109
         *
 
110
         * SIGTERM, SIGQUIT - Means the thread has been terminated.
 
111
         * SIGINT - Means the thread has been interrupted.
 
112
         *
 
113
         * When a signal is throw it clears this value. This includes
 
114
         * the case when system calls return error due to interrupt.
 
115
         */
 
116
        int                             signalPending;
 
117
        bool                    ignoreSignals;
 
118
 
 
119
        /* Set to true once the thread is running (never reset!). */
 
120
        bool                    isRunning;
 
121
 
 
122
        /* Set to true when the thread must quit (never reset!): */
 
123
        bool                    myMustQuit;     
 
124
        
 
125
        /* Set to true when this tread was initialized through the internal PBMS api. */
 
126
        /* When this is the case than it must only be freed via the API as well. */
 
127
        bool                    pbms_api_owner;
 
128
 
 
129
        CSException             myException;
 
130
 
 
131
        /* Transaction references. */
 
132
        uint32_t                        myTID;                  // Current transaction ID
 
133
        uint32_t                        myTransRef;             // Reference to the current transaction cache index
 
134
        bool                    myIsAutoCommit; // Is the current transaction in auto commit mode.
 
135
        uint32_t                        myCacheVersion; // The last transaction cache version checked. Used during overflow.
 
136
        bool                    myStartTxn;             // A flag to indicate the start of a new transaction.
 
137
        uint32_t                        myStmtCount;    // Counts the number of statements in the current transaction.
 
138
        uint32_t                        myStartStmt;    // The myStmtCount at the start of the last logical statement. (An update is 2 statements but only 1 logical statement.)
 
139
        void                    *myInfo;                // A place to hang some info. (Be carefull with this!)
 
140
        
 
141
        /* The call stack */
 
142
        int                             callTop;
 
143
        CSCallStack             callStack[CS_CALL_STACK_SIZE];
 
144
 
 
145
        /* The long jump stack: */
 
146
        int                             jumpDepth;                                                      /* The current jump depth */
 
147
        CSJumpBufRec    jumpEnv[CS_JUMP_STACK_SIZE];            /* The process environment to be restored on exception */
 
148
 
 
149
        /* The release stack */
 
150
        CSReleasePtr    relTop;                                                         /* The top of the resource stack (reference next free space). */
 
151
        CSReleaseRec    relStack[CS_RELEASE_STACK_SIZE];        /* Temporary data to be freed if an exception occurs. */
 
152
 
 
153
        CSThread(CSThreadList *list):
 
154
                CSRefObject(),
 
155
                threadName(NULL),
 
156
                myThreadList(list),
 
157
                signalPending(0),
 
158
                ignoreSignals(false),
 
159
                isRunning(false),
 
160
                myMustQuit(false),
 
161
                callTop(0),
 
162
                jumpDepth(0),
 
163
                myInfo(NULL),
 
164
                myTID(0),
 
165
                myTransRef(0),
 
166
                myStartTxn(true),
 
167
                myStmtCount(0),
 
168
                myStartStmt(0),
 
169
                myCacheVersion(0),
 
170
                relTop(relStack),
 
171
                iIsMain(false),
 
172
                iRunFunc(NULL),
 
173
                iNextLink(NULL),
 
174
                iPrevLink(NULL),
 
175
                pbms_api_owner(false)
 
176
        {
 
177
        }
 
178
 
 
179
        virtual ~CSThread() {
 
180
                if (threadName)
 
181
                        threadName->release();
 
182
        }
 
183
 
 
184
    /**
 
185
     * Task to be performed by this thread.
 
186
         *
 
187
     * @exception CSSystemException thrown if thread cannot be run.
 
188
         */
 
189
        virtual void *run();
 
190
 
 
191
        /**
 
192
         * Start execution of the thread.
 
193
         *
 
194
     * @exception CSSystemException thrown if thread is invalid.
 
195
         */
 
196
        void start();
 
197
 
 
198
        /*
 
199
         * Stop execution of the thread.
 
200
         */
 
201
        virtual void stop();
 
202
 
 
203
        /**
 
204
         * Wait for this thread to die.
 
205
         *
 
206
     * @exception CSSystemException thrown if thread is invalid.
 
207
         */
 
208
        void *join();
 
209
 
 
210
        /**
 
211
         * Signal the thread. Throws CSSystemException 
 
212
     * if the thread is invalid.
 
213
         */
 
214
        void signal(unsigned int);
 
215
 
 
216
        void setSignalPending(unsigned int);
 
217
 
 
218
        /**
 
219
         * Check to see if we have been interrupted by a signal
 
220
         * (i.e. there is a signal pending).
 
221
         * This function throws CSSignalException if
 
222
         * there is a signal pending.
 
223
         */
 
224
        void interrupted() { if (signalPending) throwSignal(); }
 
225
        void throwSignal();
 
226
 
 
227
        /* Log the stack to the specified depth along with the message. */
 
228
        void logStack(int depth, const char *msg);
 
229
 
 
230
        /* Log the exception, and the current stack. */
 
231
        void logException();
 
232
        
 
233
        /* Log the exception, and the current stack. */
 
234
        void logMessage();
 
235
        
 
236
        /*
 
237
         * Return true if this is the main thread.
 
238
         */
 
239
        bool isMain();
 
240
 
 
241
        /*
 
242
         * Throwing exceptions:
 
243
         */
 
244
        void releaseObjects(CSReleasePtr top);
 
245
        void throwException();
 
246
        void caught();
 
247
        bool isMe(CSThread *me) { return (me->iThread == iThread);}
 
248
        /* Make this object linkable: */
 
249
        virtual CSObject *getNextLink() { return iNextLink; }
 
250
        virtual CSObject *getPrevLink() { return iPrevLink; }
 
251
        virtual void setNextLink(CSObject *link) { iNextLink = link; }
 
252
        virtual void setPrevLink(CSObject *link) { iPrevLink = link; }
 
253
 
 
254
        friend class CSDaemon;
 
255
 
 
256
        friend void *td_dispatch(void *arg);
 
257
 
 
258
private:
 
259
        pthread_t               iThread;
 
260
        bool                    iIsMain;
 
261
        ThreadRunFunc   iRunFunc;
 
262
        CSObject                *iNextLink;
 
263
        CSObject                *iPrevLink;
 
264
 
 
265
        void addToList();
 
266
        void removeFromList();
 
267
 
 
268
public:
 
269
        /* Each thread stores is thread object in this key: */
 
270
        static pthread_key_t sThreadKey;
 
271
 
 
272
   /**
 
273
     * Put the currently executing thread to sleep for a given amount of
 
274
     * time.
 
275
     *
 
276
     * @param timeout maximum amount of time (milliseconds) this method could block
 
277
     *
 
278
     * @exception TDInterruptedException thrown if the threads sleep is interrupted
 
279
     *            before <i>timeout</i> milliseconds expire.
 
280
     */
 
281
        static void sleep(unsigned long timeout);
 
282
 
 
283
        /* Do static initialization and de-initialization. */
 
284
        static bool startUp();
 
285
        static void shutDown();
 
286
 
 
287
        /* Attach and detach an already running thread: */
 
288
        static bool attach(CSThread *thread);
 
289
        static void detach(CSThread *thread);
 
290
 
 
291
        /**
 
292
         * Return the thread object of the current
 
293
         * thread.
 
294
         */
 
295
        static CSThread *getSelf();
 
296
        static bool setSelf(CSThread *self);
 
297
 
 
298
        static CSThread *newCSThread();
 
299
        static CSThread *newThread(CSString *name, ThreadRunFunc run_func, CSThreadList *list);
 
300
};
 
301
 
 
302
class CSDaemon : public CSThread, public CSSync {
 
303
public:
 
304
        time_t                  myWaitTime;                                     /* Wait time in milli-seconds */
 
305
 
 
306
        CSDaemon(time_t wait_time, CSThreadList *list);
 
307
        CSDaemon(CSThreadList *list);
 
308
        virtual ~CSDaemon() { }
 
309
 
 
310
        virtual void *run();
 
311
 
 
312
        virtual bool initialize() { return true; };
 
313
 
 
314
        virtual bool doWork();
 
315
 
 
316
        virtual void *finalize() { return NULL; };
 
317
 
 
318
        virtual bool handleException();
 
319
 
 
320
        virtual void stop();
 
321
 
 
322
        void wakeup();
 
323
 
 
324
        void suspend();
 
325
 
 
326
        bool isSuspend() { return (iSuspendCount != 0);} // Don't use iSuspended, we are interested in if suspend() was called.
 
327
 
 
328
        void resume();
 
329
 
 
330
        virtual void returnToPool() {
 
331
                resume();
 
332
                release();
 
333
        }
 
334
 
 
335
        void suspended();
 
336
 
 
337
        void suspendedWait();
 
338
 
 
339
        void suspendedWait(time_t milli_sec);
 
340
 
 
341
private:
 
342
        bool                    iSuspended;
 
343
        u_int                   iSuspendCount;
 
344
};
 
345
 
 
346
#endif