1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
|
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
*
* PrimeBase Media Stream for MySQL
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Original author: Paul McCullagh (H&G2JCtL)
* Continued development: Barry Leslie
*
* 2007-05-20
*
* CORE SYSTEM:
* A independently running thread.
*
*/
#ifndef __CSTHREAD_H__
#define __CSTHREAD_H__
#include <pthread.h>
#include <setjmp.h>
#include "CSDefs.h"
#include "CSMutex.h"
#include "CSException.h"
#include "CSStorage.h"
using namespace std;
#define CS_THREAD_TYPE int
/* Types of threads: */
#define CS_ANY_THREAD 0
#define CS_THREAD 1
typedef struct CSCallStack {
const char* cs_func;
const char* cs_file;
int cs_line;
} CSCallStack, *CSCallStackPtr;
/*
* The release stack contains objects that need to be
* released when an exception occurs.
*/
#define CS_RELEASE_OBJECT 1
#define CS_RELEASE_MUTEX 2
#define CS_RELEASE_POOLED 3
#define CS_RELEASE_MEM 4
typedef struct CSRelease {
int r_type;
union {
CSObject *r_object; /* The object to be released. */
CSMutex *r_mutex; /* The mutex to be unlocked! */
CSPooled *r_pooled;
void *r_mem;
} x;
} CSReleaseRec, *CSReleasePtr;
typedef struct CSJumpBuf {
CSReleasePtr jb_res_top;
int jb_call_top;
jmp_buf jb_buffer;
} CSJumpBufRec, *CSJumpBufPtr;
class CSThreadList: public CSLinkedList, public CSMutex {
public:
CSThreadList():
CSLinkedList(),
CSMutex()
{
}
virtual ~CSThreadList() {
stopAllThreads();
}
/**
* Send the given signal to all threads, except to self!
*/
void signalAllThreads(int sig);
void quitAllThreads();
void stopAllThreads();
};
typedef void *(*ThreadRunFunc)();
class CSThread : public CSRefObject {
public:
/* The name of the thread. */
CSString *threadName;
CSThreadList *myThreadList; /* The thread list that this thread belongs to. */
/* If this value is non-zero, this signal is pending and
* must be thrown.
*
* SIGTERM, SIGQUIT - Means the thread has been terminated.
* SIGINT - Means the thread has been interrupted.
*
* When a signal is throw it clears this value. This includes
* the case when system calls return error due to interrupt.
*/
int signalPending;
bool ignoreSignals;
/* Set to true once the thread is running (never reset!). */
bool isRunning;
/* Set to true when the thread must quit (never reset!): */
bool myMustQuit;
/* Set to true when this tread was initialized through the internal PBMS api. */
/* When this is the case than it must only be freed via the API as well. */
bool pbms_api_owner;
CSException myException;
/* Transaction references. */
#ifdef DRIZZLED
CSSortedList mySavePoints;
#endif
uint32_t myTID; // Current transaction ID
uint32_t myTransRef; // Reference to the current transaction cache index
bool myIsAutoCommit; // Is the current transaction in auto commit mode.
uint32_t myCacheVersion; // The last transaction cache version checked. Used during overflow.
bool myStartTxn; // A flag to indicate the start of a new transaction.
uint32_t myStmtCount; // Counts the number of statements in the current transaction.
uint32_t myStartStmt; // The myStmtCount at the start of the last logical statement. (An update is 2 statements but only 1 logical statement.)
void *myInfo; // A place to hang some info. (Be carefull with this!)
/* The call stack */
int callTop;
CSCallStack callStack[CS_CALL_STACK_SIZE];
/* The long jump stack: */
int jumpDepth; /* The current jump depth */
CSJumpBufRec jumpEnv[CS_JUMP_STACK_SIZE]; /* The process environment to be restored on exception */
/* The release stack */
CSReleasePtr relTop; /* The top of the resource stack (reference next free space). */
CSReleaseRec relStack[CS_RELEASE_STACK_SIZE]; /* Temporary data to be freed if an exception occurs. */
CSThread(CSThreadList *list):
CSRefObject(),
threadName(NULL),
myThreadList(list),
signalPending(0),
ignoreSignals(false),
isRunning(false),
myMustQuit(false),
pbms_api_owner(false),
myTID(0),
myTransRef(0),
myIsAutoCommit(true),
myCacheVersion(0),
myStartTxn(true),
myStmtCount(0),
myStartStmt(0),
myInfo(NULL),
callTop(0),
jumpDepth(0),
relTop(relStack),
iIsMain(false),
iRunFunc(NULL),
iNextLink(NULL),
iPrevLink(NULL)
{
}
virtual ~CSThread() {
if (threadName)
threadName->release();
}
/**
* Task to be performed by this thread.
*
* @exception CSSystemException thrown if thread cannot be run.
*/
virtual void *run();
/**
* Start execution of the thread.
*
* @exception CSSystemException thrown if thread is invalid.
*/
void start();
/*
* Stop execution of the thread.
*/
virtual void stop();
/**
* Wait for this thread to die.
*
* @exception CSSystemException thrown if thread is invalid.
*/
void *join();
/**
* Signal the thread. Throws CSSystemException
* if the thread is invalid.
*/
void signal(unsigned int);
void setSignalPending(unsigned int);
/**
* Check to see if we have been interrupted by a signal
* (i.e. there is a signal pending).
* This function throws CSSignalException if
* there is a signal pending.
*/
void interrupted() { if (signalPending) throwSignal(); }
void throwSignal();
/* Log the stack to the specified depth along with the message. */
void logStack(int depth, const char *msg);
/* Log the exception, and the current stack. */
void logException();
/* Log the exception, and the current stack. */
void logMessage();
/*
* Return true if this is the main thread.
*/
bool isMain();
/*
* Throwing exceptions:
*/
void releaseObjects(CSReleasePtr top);
void throwException();
void caught();
bool isMe(CSThread *me) { return (me->iThread == iThread);}
/* Make this object linkable: */
virtual CSObject *getNextLink() { return iNextLink; }
virtual CSObject *getPrevLink() { return iPrevLink; }
virtual void setNextLink(CSObject *link) { iNextLink = link; }
virtual void setPrevLink(CSObject *link) { iPrevLink = link; }
friend class CSDaemon;
private:
pthread_t iThread;
bool iIsMain;
ThreadRunFunc iRunFunc;
CSObject *iNextLink;
CSObject *iPrevLink;
void addToList();
void removeFromList();
public:
/* Each thread stores is thread object in this key: */
static pthread_key_t sThreadKey;
/**
* Put the currently executing thread to sleep for a given amount of
* time.
*
* @param timeout maximum amount of time (milliseconds) this method could block
*
* @exception TDInterruptedException thrown if the threads sleep is interrupted
* before <i>timeout</i> milliseconds expire.
*/
static void sleep(unsigned long timeout);
/* Do static initialization and de-initialization. */
static bool isUp;
static bool startUp();
static void shutDown();
/* Attach and detach an already running thread: */
static bool attach(CSThread *thread);
static void detach(CSThread *thread);
/**
* Return the thread object of the current
* thread.
*/
static CSThread *getSelf();
static bool setSelf(CSThread *self);
static CSThread *newCSThread();
static CSThread *newThread(CSString *name, ThreadRunFunc run_func, CSThreadList *list);
/* called for a newly created thread. */
static void *dispatch(void *arg);
};
class CSDaemon : public CSThread, public CSSync {
public:
time_t myWaitTime; /* Wait time in milli-seconds */
CSDaemon(time_t wait_time, CSThreadList *list);
CSDaemon(CSThreadList *list);
virtual ~CSDaemon() { }
virtual void *run();
virtual bool initializeWork() { return true; };
virtual bool doWork();
virtual void *completeWork() { return NULL; };
virtual bool handleException();
virtual void stop();
void wakeup();
void suspend();
bool isSuspend() { return (iSuspendCount != 0);} // Don't use iSuspended, we are interested in if suspend() was called.
void resume();
virtual void returnToPool() {
resume();
release();
}
void suspended();
void suspendedWait();
void suspendedWait(time_t milli_sec);
private:
bool iSuspended;
uint32_t iSuspendCount;
};
#endif
|