~drizzle-trunk/drizzle/development

1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
2
 *
3
 * PrimeBase Media Stream for MySQL
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License
16
 * along with this program; if not, write to the Free Software
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
 *
19
 * Original author: Paul McCullagh (H&G2JCtL)
20
 * Continued development: Barry Leslie
21
 *
22
 * 2007-05-20
23
 *
24
 * CORE SYSTEM:
25
 * A independently running thread.
26
 *
27
 */
28
29
#include "CSConfig.h"
30
31
#ifdef OS_WINDOWS
32
#include <signal.h>
33
#include "uniwin.h"
34
#else
35
#include <signal.h>
36
#include <sys/signal.h>
37
#endif
38
#include <unistd.h>
39
#include <errno.h>
40
41
#include "CSGlobal.h"
42
#include "CSLog.h"
43
#include "CSException.h"
44
#include "CSThread.h"
45
#include "CSStrUtil.h"
46
#include "CSMemory.h"
47
48
/*
49
 * ---------------------------------------------------------------
50
 * SIGNAL HANDLERS
51
 */
52
53
extern "C" {
54
55
56
static void td_catch_signal(int sig)
57
{
58
	CSThread *self;
59
60
	if ((self = CSThread::getSelf())) {
61
		if (self->isMain()) {
62
			/* The main thread will pass on a signal to all threads: */
63
			if (self->myThreadList)
64
				self->myThreadList->signalAllThreads(sig);
65
			self->setSignalPending(sig);
66
		}
67
	}
68
	
69
}
70
71
static  void td_throw_signal(int sig)
72
{
73
	CSThread *self;
74
75
	if ((self = CSThread::getSelf())) {
76
		if (self->isMain()) {
77
			/* The main thread will pass on a signal to all threads: */
78
			if (self->myThreadList)
79
				self->myThreadList->signalAllThreads(sig);
80
		}
81
		self->setSignalPending(sig);
82
		self->interrupted();
83
	}
84
}
85
86
static bool td_setup_signals(CSThread *thread)
87
{
88
#ifdef OS_WINDOWS
89
	return true;
90
#else
91
	struct sigaction action;
92
93
    sigemptyset(&action.sa_mask);
94
    action.sa_flags = 0;
95
96
    action.sa_handler = td_catch_signal;
97
98
	if (sigaction(SIGUSR2, &action, NULL) == -1)
99
		goto error_occurred;
100
101
    action.sa_handler = td_throw_signal;
102
103
	return true;
104
105
	error_occurred:
106
107
	if (thread) {
108
		thread->myException.initOSError(CS_CONTEXT, errno);
109
		thread->myException.setStackTrace(thread);
110
	}
111
	else
112
		CSException::throwOSError(CS_CONTEXT, errno);
113
	return false;
114
#endif
115
}
116
117
}
118
119
/*
120
 * ---------------------------------------------------------------
121
 * THREAD LISTS
122
 */
123
124
void CSThreadList::signalAllThreads(int sig)
125
{
126
	CSThread *ptr;
127
128
	enter_();
1548.2.19 by Barry.Leslie at PrimeBase
Fixes for longjmp clobber problem, (Hopefully)
129
	lock_(this);
130
	ptr = (CSThread *) getBack();
131
	while (ptr) {
132
		if (ptr != self)
133
			ptr->signal(sig);
134
		ptr = (CSThread *) ptr->getNextLink();
135
	}
136
	unlock_(this);
137
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
138
	exit_();
139
}
140
141
void CSThreadList::quitAllThreads()
142
{
143
	CSThread *ptr;
144
145
	enter_();
1548.2.19 by Barry.Leslie at PrimeBase
Fixes for longjmp clobber problem, (Hopefully)
146
	lock_(this);
147
	
148
	ptr = (CSThread *) getBack();
149
	while (ptr) {
150
		if (ptr != self)
151
			ptr->myMustQuit = true;
152
		ptr = (CSThread *) ptr->getNextLink();
153
	}
154
	
155
	unlock_(this);
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
156
	exit_();
157
}
158
159
void CSThreadList::stopAllThreads()
160
{
161
	CSThread *thread;
162
163
	enter_();
164
	for (;;) {
165
		/* Get a thread that is not self! */
1548.2.19 by Barry.Leslie at PrimeBase
Fixes for longjmp clobber problem, (Hopefully)
166
		lock_(this);
167
		if ((thread = (CSThread *) getBack())) {
168
			while (thread) {
169
				if (thread != self)
170
					break;
171
				thread = (CSThread *) thread->getNextLink();
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
172
			}
1548.2.19 by Barry.Leslie at PrimeBase
Fixes for longjmp clobber problem, (Hopefully)
173
		}
174
		if (thread)
175
			thread->retain();
176
		unlock_(this);
177
		
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
178
		if (!thread)
179
			break;
1548.2.19 by Barry.Leslie at PrimeBase
Fixes for longjmp clobber problem, (Hopefully)
180
			
181
		push_(thread);
182
		thread->stop();
183
		release_(thread);
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
184
	}
185
	exit_();
186
}
187
188
/*
189
 * ---------------------------------------------------------------
190
 * CSTHREAD
191
 */
192
193
void CSThread::addToList()
194
{
195
	if (myThreadList) {
196
		enter_();
197
		ASSERT(self == this);
198
		lock_(myThreadList);
199
		myThreadList->addFront(self);
200
		isRunning = true;
201
		unlock_(myThreadList);
202
		exit_();
203
	}
204
	else
205
		isRunning = true;
206
}
207
	
208
void CSThread::removeFromList()
209
{
210
	if (myThreadList && isRunning) {
211
		enter_();
212
		/* Retain the thread in order to ensure
213
		 * that after it is removed from the list,
214
		 * that it is not freed! This would make the
215
		 * unlock_() call invalid, because it requires
216
		 * on the thread.
217
		 */
218
		push_(this);
219
		lock_(myThreadList);
220
		myThreadList->remove(RETAIN(this));
221
		unlock_(myThreadList);
222
		pop_(this);
223
		outer_();
224
	}
225
	this->release();
226
}
227
1548.2.38 by Barry.Leslie at PrimeBase
Fixed some solaris build problems.
228
void *CSThread::dispatch(void *arg)
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
229
{
230
	CSThread		*self;
231
	void			*return_data = NULL;
232
	int				err;
233
234
	/* Get a reference to myself: */
235
	self = reinterpret_cast<CSThread*>(arg);
236
	ASSERT(self);
237
238
	/* Store my thread in the thread key: */
239
	if ((err = pthread_setspecific(CSThread::sThreadKey, self))) {
240
		CSException::logOSError(self, CS_CONTEXT, err);
241
		return NULL;
242
	}
243
244
	/*
245
	 * Make sure the thread is not freed while we
246
	 * are running:
247
	 */
248
	self->retain();
249
250
	try_(a) {
251
		td_setup_signals(NULL);
252
253
		/* Add the thread to the list: */
254
		self->addToList();
255
256
		// Run the task from the correct context
257
		return_data = self->run();
258
	}
259
	catch_(a) {
260
		self->logException();
261
	}
262
	cont_(a);
263
264
	/*
265
	 * Removing from the thread list will also release the thread.
266
	 */
267
	self->removeFromList();
268
269
	// Exit the thread
270
	return return_data;
271
}
272
1548.2.38 by Barry.Leslie at PrimeBase
Fixed some solaris build problems.
273
1643.1.8 by Monty Taylor
Fixed a couple of solaris build issues. Callback functions passed to
274
extern "C"
1643.1.10 by Monty Taylor
Fixed OSX build errors.
275
{
276
1548.2.38 by Barry.Leslie at PrimeBase
Fixed some solaris build problems.
277
static void *dispatch_wrapper(void *arg)
278
{
279
	return CSThread::dispatch(arg);
280
}
281
1643.1.10 by Monty Taylor
Fixed OSX build errors.
282
}
283
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
284
void *CSThread::run()
285
{
286
	if (iRunFunc)
287
		return iRunFunc();
288
	return NULL;
289
}
290
291
void CSThread::start()
292
{
293
	int err;
294
1548.2.38 by Barry.Leslie at PrimeBase
Fixed some solaris build problems.
295
	err = pthread_create(&iThread, NULL, dispatch_wrapper, (void *) this);
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
296
	if (err)
297
		CSException::throwOSError(CS_CONTEXT, err);
298
	while (!isRunning) {
299
		/* Check if the thread is still alive,
300
		 * so we don't hang forever.
301
		 */
302
		if (pthread_kill(iThread, 0))
303
			break;
304
		usleep(10);
305
	}
306
}
307
308
void CSThread::stop()
309
{
310
	signal(SIGTERM);
311
	join();
312
}
313
314
void *CSThread::join()
315
{
316
	void	*return_data;
317
	int		err;
318
319
	enter_();
320
	if ((err = pthread_join(iThread, &return_data)))
321
		CSException::throwOSError(CS_CONTEXT, err);
322
	return_(return_data);
323
}
324
325
void CSThread::setSignalPending(unsigned int sig)
326
{
327
	if (sig == SIGTERM)
328
		/* The terminate signal takes priority: */
329
		signalPending = SIGTERM;
330
	else if (!signalPending)
331
		/* Otherwise, first signal wins... */
332
		signalPending = sig;
333
}
334
335
void CSThread::signal(unsigned int sig)
336
{
337
	int err;
338
339
	setSignalPending(sig);
340
	if ((err = pthread_kill(iThread, SIGUSR2)))
341
	{
342
		/* Ignore the error if the process does not exist! */
343
		if (err != ESRCH) /* No such process */
344
			CSException::throwOSError(CS_CONTEXT, err);
345
	}
346
}
347
348
void CSThread::throwSignal()
349
{
350
	int sig;
351
352
	if ((sig = signalPending) && !ignoreSignals) {
353
		signalPending = 0;
354
		CSException::throwSignal(CS_CONTEXT, sig);
355
	}
356
}
357
358
bool CSThread::isMain()
359
{
360
	return iIsMain;
361
}
362
363
/*
364
 * -----------------------------------------------------------------------
365
 * THROWING EXCEPTIONS
366
 */
367
368
/* 
369
 * When an exception is .
370
 */
371
372
void CSThread::releaseObjects(CSReleasePtr top)
373
{
374
	CSObject *obj;
375
376
	while (relTop > top) {
377
		/* Remove and release or unlock the object on the top of the stack: */
378
		relTop--;
379
		switch(relTop->r_type) {
380
			case CS_RELEASE_OBJECT:
381
				if ((obj = relTop->x.r_object))
382
					obj->release();
383
				break;
384
			case CS_RELEASE_MUTEX:
385
				if (relTop->x.r_mutex)
386
					relTop->x.r_mutex->unlock();
387
				break;
388
			case CS_RELEASE_POOLED:
389
				if (relTop->x.r_pooled)
390
					relTop->x.r_pooled->returnToPool();
391
				break;
392
		}
393
	}
394
}
395
396
/* Throw an already registered error: */
397
void CSThread::throwException()
398
{
399
	/* Record the stack trace: */
400
	if (this->jumpDepth > 0 && this->jumpDepth <= CS_JUMP_STACK_SIZE) {
401
		/*
402
		 * As recommended by Barry:
403
		 * release the objects before we jump!
404
		 * This has the advantage that the stack context is still
405
		 * valid when the resources are released.
406
		 */
407
		releaseObjects(this->jumpEnv[this->jumpDepth-1].jb_res_top);
408
409
		/* Then do the longjmp: */
410
		longjmp(this->jumpEnv[this->jumpDepth-1].jb_buffer, 1);
411
	}
412
}
413
414
void CSThread::logStack(int depth, const char *msg)
415
{
416
	char buffer[CS_EXC_CONTEXT_SIZE +1];
417
	CSL.lock();
418
	CSL.log(this, CSLog::Trace, msg);
419
	
420
	for (int i= callTop-1; i>=0 && depth; i--, depth--) {
421
		cs_format_context(CS_EXC_CONTEXT_SIZE, buffer,
422
			callStack[i].cs_func, callStack[i].cs_file, callStack[i].cs_line);
423
		strcat(buffer, "\n");
424
		CSL.log(this, CSLog::Trace, buffer);
425
	}
426
	CSL.unlock();
427
}
428
429
void CSThread::logException()
430
{
431
	myException.log(this);
432
}
433
434
/*
435
 * This function is called when an exception is caught.
436
 * It restores the function call top and frees
437
 * any resource allocated by lower levels.
438
 */
439
void CSThread::caught()
440
{
441
	/* Restore the call top: */
442
	this->callTop = this->jumpEnv[this->jumpDepth].jb_call_top;
443
444
	/* 
445
	 * Release all all objects that were pushed after
446
	 * this jump position was set:
447
	 */
448
	releaseObjects(this->jumpEnv[this->jumpDepth].jb_res_top);
449
}
450
451
/*
452
 * ---------------------------------------------------------------
453
 * STATIC METHODS
454
 */
455
456
pthread_key_t	CSThread::sThreadKey;
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
457
bool			CSThread::isUp = false;
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
458
459
bool CSThread::startUp()
460
{
461
	int err;
462
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
463
	isUp = false;
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
464
	if ((err = pthread_key_create(&sThreadKey, NULL))) {
465
		CSException::logOSError(CS_CONTEXT, errno);
466
		return false;
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
467
	} else
468
		isUp = true;
469
		
470
	return isUp;
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
471
}
472
473
void CSThread::shutDown()
474
{
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
475
	isUp = false;
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
476
}
477
478
bool CSThread::attach(CSThread *thread)
479
{
480
	ASSERT(!getSelf());
481
	
482
	if (!thread) {
483
		CSException::logOSError(CS_CONTEXT, ENOMEM);
484
		return false;
485
	}
486
487
	if (!setSelf(thread))
488
		return false;
489
490
	/* Now we are ready to receive signals: */
491
	if (!td_setup_signals(thread))
492
		return false;
493
494
	thread->addToList();
495
	thread->retain();
496
	return true;
497
}
498
499
void CSThread::detach(CSThread *thread)
500
{
501
	ASSERT(!getSelf() || getSelf() == thread);
502
	thread->removeFromList();
503
	thread->release();
504
	pthread_setspecific(sThreadKey, NULL);
505
}
506
507
CSThread* CSThread::getSelf()
508
{
1548.2.20 by Barry.Leslie at PrimeBase
Code cleanup.
509
	CSThread* self = NULL;
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
510
	
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
511
	if ((!isUp) || !(self = (CSThread*) pthread_getspecific(sThreadKey)))
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
512
		return (CSThread*) NULL;
1548.2.2 by Barry.Leslie at PrimeBase
A lot of minor changes to clean up the code and to get it to build with Drizzle.
513
		
514
#ifdef DEBUG
515
	if (self->iRefCount == 0) {
516
		pthread_setspecific(sThreadKey, NULL);
517
		CSException::throwAssertion(CS_CONTEXT, "Bad self pointer.");
518
	}	
519
#endif
520
521
	return self;
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
522
}
523
524
bool CSThread::setSelf(CSThread *self)
525
{
526
	int err;
527
528
	if (self) {
529
		self->iThread = pthread_self();
530
531
		/* Store my thread in the thread key: */
532
		if ((err = pthread_setspecific(sThreadKey, self))) {
533
			self->myException.initOSError(CS_CONTEXT, err);
534
			self->myException.setStackTrace(self);
535
			return false;
536
		}
537
	}
538
	else
539
		pthread_setspecific(sThreadKey, NULL);
540
	return true;
541
}
542
543
/* timeout is in milliseconds */
544
void CSThread::sleep(unsigned long timeout)
545
{
546
	enter_();
547
	usleep(timeout * 1000);
548
	self->interrupted();
549
	exit_();
550
}
551
1548.2.23 by Barry.Leslie at PrimeBase
And more cleanup.
552
#ifdef DEBUG
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
553
int cs_assert(const char *func, const char *file, int line, const char *message)
554
{
555
	CSException::throwAssertion(func, file, line, message);
556
	return 0;
557
}
558
559
int cs_hope(const char *func, const char *file, int line, const char *message)
560
{
561
	CSException e;
562
		
563
	e.initAssertion(func, file, line, message);
564
	e.log(NULL);
565
	return 0;
566
}
1548.2.23 by Barry.Leslie at PrimeBase
And more cleanup.
567
#endif
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
568
569
CSThread *CSThread::newThread(CSString *name, ThreadRunFunc run_func, CSThreadList *list)
570
{
571
	CSThread *thd;
572
573
	enter_();
574
	if (!(thd = new CSThread(list))) {
575
		name->release();
576
		CSException::throwOSError(CS_CONTEXT, ENOMEM);
577
	}
578
	thd->threadName = name;
579
	thd->iRunFunc = run_func;
580
	return_(thd);
581
}
582
583
CSThread *CSThread::newCSThread()
584
{
585
	CSThread *thd = NULL;
586
587
	if (!(thd = new CSThread(NULL))) {
588
		CSException::throwOSError(CS_CONTEXT, ENOMEM);
589
	}
590
	
591
	return thd;
592
}
593
594
/*
595
 * ---------------------------------------------------------------
596
 * DAEMON THREADS
597
 */
598
599
CSDaemon::CSDaemon(time_t wait_time, CSThreadList *list):
600
CSThread(list),
601
CSSync(),
602
myWaitTime(wait_time),
603
iSuspended(false),
604
iSuspendCount(0)
605
{
606
}
607
608
CSDaemon::CSDaemon(CSThreadList *list):
609
CSThread(list),
610
CSSync(),
611
myWaitTime(0),
612
iSuspended(false),
613
iSuspendCount(0)
614
{
615
}
616
617
void *CSDaemon::run()
618
{
619
	bool must_sleep = false;
620
1548.2.20 by Barry.Leslie at PrimeBase
Code cleanup.
621
	CLOBBER_PROTECT(must_sleep);
622
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
623
	enter_();
1548.2.21 by Barry.Leslie at PrimeBase
code cleanup.
624
	CLOBBER_PROTECT(self);
625
1548.2.11 by Barry.Leslie at PrimeBase
Removed libxml reqirement by using a home grown xml parser.
626
	myMustQuit = !initializeWork();
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
627
628
	restart:
629
	try_(a) {
630
		while (!myMustQuit) {
631
			if (must_sleep) {
632
				lock_(this);
633
				if (myWaitTime)
634
					suspendedWait(myWaitTime);
635
				else
636
					suspendedWait();
637
				unlock_(this);
638
				if (myMustQuit)
639
					break;
640
			}
641
			must_sleep = doWork();
642
		}
643
	}
644
	catch_(a) {
645
		if (!handleException())
646
			myMustQuit = true;
647
	}
648
	cont_(a);
649
	if (!myMustQuit) {
650
		must_sleep = true;
651
		goto restart;
652
	}
653
1548.2.11 by Barry.Leslie at PrimeBase
Removed libxml reqirement by using a home grown xml parser.
654
	/* Prevent signals from going off in completeWork! */
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
655
	ignoreSignals = true;
656
1548.2.11 by Barry.Leslie at PrimeBase
Removed libxml reqirement by using a home grown xml parser.
657
	return_(completeWork());
1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
658
}
659
660
bool CSDaemon::doWork()
661
{
662
	if (iRunFunc)
663
		(void) iRunFunc();
664
	return true;
665
}
666
667
bool CSDaemon::handleException()
668
{
669
	if (!myMustQuit)
670
		logException();
671
	return true;
672
}
673
674
void CSDaemon::wakeup()
675
{
676
	CSSync::wakeup();
677
}
678
679
void CSDaemon::stop()
680
{
681
	myMustQuit = true;
682
	wakeup();
683
	signal(SIGTERM);
684
	join();
685
}
686
687
void CSDaemon::suspend()
688
{
689
	enter_();
690
	lock_(this);
691
	iSuspendCount++;
692
	while (!iSuspended && !myMustQuit)
693
		wait(500);
694
	if (!iSuspended)
695
		iSuspendCount--;
696
	unlock_(this);
697
	exit_();
698
}
699
700
void CSDaemon::resume()
701
{
702
	enter_();
703
	lock_(this);
704
	if (iSuspendCount > 0)
705
		iSuspendCount--;
706
	wakeup();
707
	unlock_(this);
708
	exit_();
709
}
710
711
void CSDaemon::suspended()
712
{
713
	if (!iSuspendCount || myMustQuit) {
714
		iSuspended = false;
715
		return;
716
	}
717
	enter_();
718
	lock_(this);
719
	while (iSuspendCount && !myMustQuit) {
720
		iSuspended = true;
721
		wait(500);
722
	}
723
	iSuspended = false;
724
	unlock_(this);
725
	exit_();
726
}
727
728
void CSDaemon::suspendedWait()
729
{
730
	iSuspended = true;
731
	wait();
732
	if (iSuspendCount)
733
		suspended();
734
}
735
736
void CSDaemon::suspendedWait(time_t milli_sec)
737
{
738
	iSuspended = true;
739
	wait(milli_sec);
740
	if (iSuspendCount)
741
		suspended();
742
	else
743
		iSuspended = false;
744
}
745
746
/*
747
 * ---------------------------------------------------------------
748
 * THREAD POOLS
749
 */
750
751