1
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* Original author: Paul McCullagh (H&G2JCtL)
20
* Continued development: Barry Leslie
25
* A independently running thread.
39
#include <sys/signal.h>
46
#include "CSException.h"
48
#include "CSStrUtil.h"
52
* ---------------------------------------------------------------
59
static void td_catch_signal(int sig)
63
if ((self = CSThread::getSelf())) {
65
/* The main thread will pass on a signal to all threads: */
66
if (self->myThreadList)
67
self->myThreadList->signalAllThreads(sig);
68
self->setSignalPending(sig);
74
static void td_throw_signal(int sig)
78
if ((self = CSThread::getSelf())) {
80
/* The main thread will pass on a signal to all threads: */
81
if (self->myThreadList)
82
self->myThreadList->signalAllThreads(sig);
84
self->setSignalPending(sig);
89
static bool td_setup_signals(CSThread *thread)
94
struct sigaction action;
96
sigemptyset(&action.sa_mask);
99
action.sa_handler = td_catch_signal;
101
if (sigaction(SIGUSR2, &action, NULL) == -1)
104
action.sa_handler = td_throw_signal;
111
thread->myException.initOSError(CS_CONTEXT, errno);
112
thread->myException.setStackTrace(thread);
115
CSException::throwOSError(CS_CONTEXT, errno);
123
* ---------------------------------------------------------------
127
void CSThreadList::signalAllThreads(int sig)
133
ptr = (CSThread *) getBack();
137
ptr = (CSThread *) ptr->getNextLink();
144
void CSThreadList::quitAllThreads()
151
ptr = (CSThread *) getBack();
154
ptr->myMustQuit = true;
155
ptr = (CSThread *) ptr->getNextLink();
162
void CSThreadList::stopAllThreads()
168
/* Get a thread that is not self! */
170
if ((thread = (CSThread *) getBack())) {
174
thread = (CSThread *) thread->getNextLink();
192
* ---------------------------------------------------------------
196
void CSThread::addToList()
200
ASSERT(self == this);
202
myThreadList->addFront(self);
204
unlock_(myThreadList);
211
void CSThread::removeFromList()
213
if (myThreadList && isRunning) {
214
CSThread *myself = this; // pop_() wants to take a reference to its parameter.
216
/* Retain the thread in order to ensure
217
* that after it is removed from the list,
218
* that it is not freed! This would make the
219
* unlock_() call invalid, because it requires
224
myThreadList->remove(RETAIN(myself));
225
unlock_(myThreadList);
232
void *CSThread::dispatch(void *arg)
235
void *return_data = NULL;
238
/* Get a reference to myself: */
239
self = reinterpret_cast<CSThread*>(arg);
242
/* Store my thread in the thread key: */
243
if ((err = pthread_setspecific(CSThread::sThreadKey, self))) {
244
CSException::logOSError(self, CS_CONTEXT, err);
249
* Make sure the thread is not freed while we
255
td_setup_signals(NULL);
257
/* Add the thread to the list: */
260
// Run the task from the correct context
261
return_data = self->run();
264
self->logException();
269
* Removing from the thread list will also release the thread.
271
self->removeFromList();
281
static void *dispatch_wrapper(void *arg)
283
return CSThread::dispatch(arg);
288
void *CSThread::run()
295
void CSThread::start(bool detached)
299
err = pthread_create(&iThread, NULL, dispatch_wrapper, (void *) this);
301
CSException::throwOSError(CS_CONTEXT, err);
303
/* Check if the thread is still alive,
304
* so we don't hang forever.
306
if (pthread_kill(iThread, 0))
311
isDetached = detached;
313
pthread_detach(iThread);
316
void CSThread::stop()
322
void *CSThread::join()
324
void *return_data = NULL;
329
while (isRunning && !pthread_kill(iThread, 0))
331
} else if ((err = pthread_join(iThread, &return_data))) {
332
CSException::throwOSError(CS_CONTEXT, err);
335
return_(return_data);
338
void CSThread::setSignalPending(unsigned int sig)
341
/* The terminate signal takes priority: */
342
signalPending = SIGTERM;
343
else if (!signalPending)
344
/* Otherwise, first signal wins... */
348
void CSThread::signal(unsigned int sig)
350
#ifndef OS_WINDOWS // Currently you cannot signal threads on windows.
353
setSignalPending(sig);
354
if ((err = pthread_kill(iThread, SIGUSR2)))
356
/* Ignore the error if the process does not exist! */
357
if (err != ESRCH) /* No such process */
358
CSException::throwOSError(CS_CONTEXT, err);
363
void CSThread::throwSignal()
367
if ((sig = signalPending) && !ignoreSignals) {
369
CSException::throwSignal(CS_CONTEXT, sig);
373
bool CSThread::isMain()
379
* -----------------------------------------------------------------------
380
* THROWING EXCEPTIONS
384
* When an exception is .
387
void CSThread::releaseObjects(CSReleasePtr top)
391
while (relTop > top) {
392
/* Remove and release or unlock the object on the top of the stack: */
394
switch(relTop->r_type) {
395
case CS_RELEASE_OBJECT:
396
if ((obj = relTop->x.r_object))
399
case CS_RELEASE_MUTEX:
400
if (relTop->x.r_mutex)
401
relTop->x.r_mutex->unlock();
403
case CS_RELEASE_POOLED:
404
if (relTop->x.r_pooled)
405
relTop->x.r_pooled->returnToPool();
409
cs_free(relTop->x.r_mem);
411
case CS_RELEASE_OBJECT_PTR:
412
if ((relTop->x.r_objectPtr) && (obj = *(relTop->x.r_objectPtr)))
419
/* Throw an already registered error: */
420
void CSThread::throwException()
422
/* Record the stack trace: */
423
if (this->jumpDepth > 0 && this->jumpDepth <= CS_JUMP_STACK_SIZE) {
425
* As recommended by Barry:
426
* release the objects before we jump!
427
* This has the advantage that the stack context is still
428
* valid when the resources are released.
430
releaseObjects(this->jumpEnv[this->jumpDepth-1].jb_res_top);
432
/* Then do the longjmp: */
433
longjmp(this->jumpEnv[this->jumpDepth-1].jb_buffer, 1);
437
void CSThread::logStack(int depth, const char *msg)
439
char buffer[CS_EXC_CONTEXT_SIZE +1];
441
CSL.log(this, CSLog::Trace, msg);
443
for (int i= callTop-1; i>=0 && depth; i--, depth--) {
444
cs_format_context(CS_EXC_CONTEXT_SIZE, buffer,
445
callStack[i].cs_func, callStack[i].cs_file, callStack[i].cs_line);
446
strcat(buffer, "\n");
447
CSL.log(this, CSLog::Trace, buffer);
452
void CSThread::logException()
454
myException.log(this);
458
* This function is called when an exception is caught.
459
* It restores the function call top and frees
460
* any resource allocated by lower levels.
462
void CSThread::caught()
464
/* Restore the call top: */
465
this->callTop = this->jumpEnv[this->jumpDepth].jb_call_top;
468
* Release all all objects that were pushed after
469
* this jump position was set:
471
releaseObjects(this->jumpEnv[this->jumpDepth].jb_res_top);
475
* ---------------------------------------------------------------
479
pthread_key_t CSThread::sThreadKey;
480
bool CSThread::isUp = false;
482
bool CSThread::startUp()
487
if ((err = pthread_key_create(&sThreadKey, NULL))) {
488
CSException::logOSError(CS_CONTEXT, errno);
495
void CSThread::shutDown()
500
bool CSThread::attach(CSThread *thread)
505
CSException::logOSError(CS_CONTEXT, ENOMEM);
509
if (!setSelf(thread))
512
/* Now we are ready to receive signals: */
513
if (!td_setup_signals(thread))
521
void CSThread::detach(CSThread *thread)
523
ASSERT(!getSelf() || getSelf() == thread);
524
thread->removeFromList();
526
pthread_setspecific(sThreadKey, NULL);
529
CSThread* CSThread::getSelf()
531
CSThread* self = NULL;
533
if ((!isUp) || !(self = (CSThread*) pthread_getspecific(sThreadKey)))
534
return (CSThread*) NULL;
537
/* PMC - Problem is, if this is called when releasing a
538
* thread, then we have the reference count equal to
540
if (self && !self->iRefCount) {
541
pthread_setspecific(sThreadKey, NULL);
542
CSException::throwAssertion(CS_CONTEXT, "Freed self pointer referenced.");
550
bool CSThread::setSelf(CSThread *self)
555
self->iThread = pthread_self();
557
/* Store my thread in the thread key: */
558
if ((err = pthread_setspecific(sThreadKey, self))) {
559
self->myException.initOSError(CS_CONTEXT, err);
560
self->myException.setStackTrace(self);
565
pthread_setspecific(sThreadKey, NULL);
569
/* timeout is in milliseconds */
570
void CSThread::sleep(unsigned long timeout)
573
usleep(timeout * 1000);
579
int cs_assert(const char *func, const char *file, int line, const char *message)
581
CSException::throwAssertion(func, file, line, message);
585
int cs_hope(const char *func, const char *file, int line, const char *message)
589
e.initAssertion(func, file, line, message);
595
CSThread *CSThread::newThread(CSString *name, ThreadRunFunc run_func, CSThreadList *list)
600
if (!(thd = new CSThread(list))) {
602
CSException::throwOSError(CS_CONTEXT, ENOMEM);
604
thd->threadName = name;
605
thd->iRunFunc = run_func;
609
CSThread *CSThread::newCSThread()
611
CSThread *thd = NULL;
613
if (!(thd = new CSThread(NULL))) {
614
CSException::throwOSError(CS_CONTEXT, ENOMEM);
621
* ---------------------------------------------------------------
625
CSDaemon::CSDaemon(time_t wait_time, CSThreadList *list):
628
myWaitTime(wait_time),
634
CSDaemon::CSDaemon(CSThreadList *list):
643
void CSDaemon::try_Run(CSThread *self, const bool c_must_sleep)
646
bool must_sleep = c_must_sleep; // This done to avoid longjmp() clobber.
647
while (!myMustQuit) {
651
suspendedWait(myWaitTime);
658
must_sleep = doWork();
662
if (!handleException())
668
void *CSDaemon::run()
670
bool must_sleep = false;
674
myMustQuit = !initializeWork();
676
while (!myMustQuit) {
677
try_Run(self, must_sleep);
681
/* Prevent signals from going off in completeWork! */
682
ignoreSignals = true;
684
return_(completeWork());
687
bool CSDaemon::doWork()
694
bool CSDaemon::handleException()
701
void CSDaemon::wakeup()
706
void CSDaemon::stop()
714
void CSDaemon::suspend()
719
while (!iSuspended && !myMustQuit)
727
void CSDaemon::resume()
731
if (iSuspendCount > 0)
738
void CSDaemon::suspended()
740
if (!iSuspendCount || myMustQuit) {
746
while (iSuspendCount && !myMustQuit) {
755
void CSDaemon::suspendedWait()
763
void CSDaemon::suspendedWait(time_t milli_sec)
774
* ---------------------------------------------------------------