262
254
pthread_mutex_unlock(&LOCK_session_add);
268
* Derived class for pool of threads scheduler.
270
class PoolOfThreadsScheduler: public plugin::Scheduler
276
PoolOfThreadsScheduler(): Scheduler()
278
struct sched_param tmp_sched_param;
280
memset(&tmp_sched_param, 0, sizeof(struct sched_param));
281
/* Setup attribute parameter for session threads. */
282
(void) pthread_attr_init(&attr);
283
(void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
284
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
286
tmp_sched_param.sched_priority= WAIT_PRIOR;
287
(void) pthread_attr_setschedparam(&attr, &tmp_sched_param);
290
~PoolOfThreadsScheduler()
292
(void) pthread_mutex_lock(&LOCK_thread_count);
294
kill_pool_threads= true;
295
while (created_threads)
298
Wake up the event loop
301
size_t written= write(session_add_pipe[1], &c, sizeof(c));
302
assert(written == sizeof(c));
304
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
306
(void) pthread_mutex_unlock(&LOCK_thread_count);
308
event_del(&session_add_event);
309
close(session_add_pipe[0]);
310
close(session_add_pipe[1]);
311
event_del(&session_kill_event);
312
close(session_kill_pipe[0]);
313
close(session_kill_pipe[1]);
315
(void) pthread_mutex_destroy(&LOCK_event_loop);
316
(void) pthread_mutex_destroy(&LOCK_session_add);
317
(void) pthread_mutex_destroy(&LOCK_session_kill);
318
(void) pthread_attr_destroy(&attr);
323
* Notify the thread pool about a new connection
325
* @param[in] the newly connected session
328
* True if there is an error.
330
virtual bool addSession(Session *session)
332
assert(session->scheduler_arg == NULL);
333
session_scheduler *scheduler= new session_scheduler(session);
335
if (scheduler == NULL)
338
session->scheduler_arg= (void *)scheduler;
340
libevent_session_add(session);
348
* Signal a waiting connection it's time to die.
351
* This function will signal libevent the Session should be killed.
353
* @param[in] session The connection to kill
355
virtual void killSession(Session *session)
359
pthread_mutex_lock(&LOCK_session_kill);
361
if (sessions_to_be_killed.empty())
364
Notify libevent with the killing event if this's the first killing
365
notification of the batch
367
size_t written= write(session_kill_pipe[1], &c, sizeof(c));
368
assert(written == sizeof(c));
372
Push into the sessions_to_be_killed queue
374
sessions_to_be_killed.push(session);
375
pthread_mutex_unlock(&LOCK_session_kill);
380
* Create all threads for the thread pool
383
* After threads are created we wait until all threads has signaled that
384
* they have started before we return
387
* @retval 1 We got an error creating the thread pool. In this case we will abort all created threads.
389
bool libevent_init(void)
395
pthread_mutex_init(&LOCK_event_loop, NULL);
396
pthread_mutex_init(&LOCK_session_add, NULL);
397
pthread_mutex_init(&LOCK_session_kill, NULL);
399
/* Set up the pipe used to add new sessions to the event pool */
400
if (init_pipe(session_add_pipe))
402
errmsg_printf(ERRMSG_LVL_ERROR,
403
_("init_pipe(session_add_pipe) error in libevent_init\n"));
406
/* Set up the pipe used to kill sessions in the event queue */
407
if (init_pipe(session_kill_pipe))
409
errmsg_printf(ERRMSG_LVL_ERROR,
410
_("init_pipe(session_kill_pipe) error in libevent_init\n"));
411
close(session_add_pipe[0]);
412
close(session_add_pipe[1]);
415
event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
416
libevent_add_session_callback, NULL);
417
event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
418
libevent_kill_session_callback, NULL);
420
if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
422
errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
426
/* Set up the thread pool */
427
pthread_mutex_lock(&LOCK_thread_count);
429
for (x= 0; x < size; x++)
433
if ((error= pthread_create(&thread, &attr, libevent_thread_proc, 0)))
435
errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
437
pthread_mutex_unlock(&LOCK_thread_count);
442
/* Wait until all threads are created */
443
while (created_threads != size)
444
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
445
pthread_mutex_unlock(&LOCK_thread_count);
454
* Factory class used to produce a Pool_of_threads_scheduler
456
class PoolOfThreadsFactory : public plugin::SchedulerFactory
459
PoolOfThreadsFactory() : SchedulerFactory("pool_of_threads") {}
460
~PoolOfThreadsFactory() { if (scheduler != NULL) delete scheduler; }
461
plugin::Scheduler *operator() ()
463
if (scheduler == NULL)
465
PoolOfThreadsScheduler *pot= new PoolOfThreadsScheduler();
466
if (pot->libevent_init())
479
259
* Close and delete a connection.
686
static PoolOfThreadsFactory *factory= NULL;
466
PoolOfThreadsScheduler::~PoolOfThreadsScheduler()
468
(void) pthread_mutex_lock(&LOCK_thread_count);
470
kill_pool_threads= true;
471
while (created_threads)
474
Wake up the event loop
477
size_t written= write(session_add_pipe[1], &c, sizeof(c));
478
assert(written == sizeof(c));
480
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
482
(void) pthread_mutex_unlock(&LOCK_thread_count);
484
event_del(&session_add_event);
485
close(session_add_pipe[0]);
486
close(session_add_pipe[1]);
487
event_del(&session_kill_event);
488
close(session_kill_pipe[0]);
489
close(session_kill_pipe[1]);
491
(void) pthread_mutex_destroy(&LOCK_event_loop);
492
(void) pthread_mutex_destroy(&LOCK_session_add);
493
(void) pthread_mutex_destroy(&LOCK_session_kill);
494
(void) pthread_attr_destroy(&attr);
498
bool PoolOfThreadsScheduler::addSession(Session *session)
500
assert(session->scheduler_arg == NULL);
501
session_scheduler *scheduler= new session_scheduler(session);
503
if (scheduler == NULL)
506
session->scheduler_arg= (void *)scheduler;
508
libevent_session_add(session);
514
void PoolOfThreadsScheduler::killSession(Session *session)
518
pthread_mutex_lock(&LOCK_session_kill);
520
if (sessions_to_be_killed.empty())
523
Notify libevent with the killing event if this's the first killing
524
notification of the batch
526
size_t written= write(session_kill_pipe[1], &c, sizeof(c));
527
assert(written == sizeof(c));
531
Push into the sessions_to_be_killed queue
533
sessions_to_be_killed.push(session);
534
pthread_mutex_unlock(&LOCK_session_kill);
538
bool PoolOfThreadsScheduler::libevent_init(void)
544
pthread_mutex_init(&LOCK_event_loop, NULL);
545
pthread_mutex_init(&LOCK_session_add, NULL);
546
pthread_mutex_init(&LOCK_session_kill, NULL);
548
/* Set up the pipe used to add new sessions to the event pool */
549
if (init_pipe(session_add_pipe))
551
errmsg_printf(ERRMSG_LVL_ERROR,
552
_("init_pipe(session_add_pipe) error in libevent_init\n"));
555
/* Set up the pipe used to kill sessions in the event queue */
556
if (init_pipe(session_kill_pipe))
558
errmsg_printf(ERRMSG_LVL_ERROR,
559
_("init_pipe(session_kill_pipe) error in libevent_init\n"));
560
close(session_add_pipe[0]);
561
close(session_add_pipe[1]);
564
event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
565
libevent_add_session_callback, NULL);
566
event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
567
libevent_kill_session_callback, NULL);
569
if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
571
errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
575
/* Set up the thread pool */
576
pthread_mutex_lock(&LOCK_thread_count);
578
for (x= 0; x < size; x++)
582
if ((error= pthread_create(&thread, &attr, libevent_thread_proc, 0)))
584
errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
586
pthread_mutex_unlock(&LOCK_thread_count);
591
/* Wait until all threads are created */
592
while (created_threads != size)
593
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
594
pthread_mutex_unlock(&LOCK_thread_count);