~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pool_of_threads/pool_of_threads.cc

  • Committer: Brian Aker
  • Date: 2009-10-01 22:56:26 UTC
  • mto: (1154.1.1 staging)
  • mto: This revision was merged to the branch mainline in revision 1155.
  • Revision ID: brian@gaz-20091001225626-sb1pdykpxlnkheaj
Remove Factory/make scheduler work like everything else.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19
19
 */
20
20
 
21
 
#include <drizzled/server_includes.h>
22
 
#include <drizzled/gettext.h>
23
 
#include <drizzled/error.h>
24
 
#include <drizzled/plugin/scheduler.h>
25
 
#include <drizzled/sql_parse.h>
26
 
#include <drizzled/session.h>
27
 
#include <drizzled/plugin/client.h>
28
 
#include "session_scheduler.h"
29
 
#include <string>
30
 
#include <queue>
31
 
#include <set>
32
 
#include <event.h>
 
21
#include <plugin/pool_of_threads/pool_of_threads.h>
33
22
 
34
23
using namespace std;
35
24
using namespace drizzled;
37
26
/* We add this so as to avoid ambiguity with "struct queue" defined in Solaris system header */
38
27
using std::queue;
39
28
 
 
29
/* Global's (TBR) */
 
30
static PoolOfThreadsScheduler *scheduler= NULL;
 
31
 
40
32
/**
41
33
 * Set this to true to trigger killing of all threads in the pool
42
34
 */
262
254
  pthread_mutex_unlock(&LOCK_session_add);
263
255
}
264
256
 
265
 
 
266
 
/**
267
 
 * @brief 
268
 
 *  Derived class for pool of threads scheduler.
269
 
 */
270
 
class PoolOfThreadsScheduler: public plugin::Scheduler
271
 
{
272
 
private:
273
 
  pthread_attr_t attr;
274
 
 
275
 
public:
276
 
  PoolOfThreadsScheduler(): Scheduler()
277
 
  {
278
 
    struct sched_param tmp_sched_param;
279
 
 
280
 
    memset(&tmp_sched_param, 0, sizeof(struct sched_param));
281
 
    /* Setup attribute parameter for session threads. */
282
 
    (void) pthread_attr_init(&attr);
283
 
    (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
284
 
    pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
285
 
 
286
 
    tmp_sched_param.sched_priority= WAIT_PRIOR;
287
 
    (void) pthread_attr_setschedparam(&attr, &tmp_sched_param);
288
 
  }
289
 
 
290
 
  ~PoolOfThreadsScheduler()
291
 
  {
292
 
    (void) pthread_mutex_lock(&LOCK_thread_count);
293
 
  
294
 
    kill_pool_threads= true;
295
 
    while (created_threads)
296
 
    {
297
 
      /* 
298
 
       Wake up the event loop 
299
 
      */
300
 
      char c= 0;
301
 
      size_t written= write(session_add_pipe[1], &c, sizeof(c));
302
 
      assert(written == sizeof(c));
303
 
  
304
 
      pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
305
 
    }
306
 
    (void) pthread_mutex_unlock(&LOCK_thread_count);
307
 
  
308
 
    event_del(&session_add_event);
309
 
    close(session_add_pipe[0]);
310
 
    close(session_add_pipe[1]);
311
 
    event_del(&session_kill_event);
312
 
    close(session_kill_pipe[0]);
313
 
    close(session_kill_pipe[1]);
314
 
  
315
 
    (void) pthread_mutex_destroy(&LOCK_event_loop);
316
 
    (void) pthread_mutex_destroy(&LOCK_session_add);
317
 
    (void) pthread_mutex_destroy(&LOCK_session_kill);
318
 
    (void) pthread_attr_destroy(&attr);
319
 
  }
320
 
 
321
 
  /**
322
 
   * @brief 
323
 
   *  Notify the thread pool about a new connection
324
 
   *
325
 
   * @param[in] the newly connected session 
326
 
   *
327
 
   * @return 
328
 
   *  True if there is an error.
329
 
   */
330
 
  virtual bool addSession(Session *session)
331
 
  {
332
 
    assert(session->scheduler_arg == NULL);
333
 
    session_scheduler *scheduler= new session_scheduler(session);
334
 
  
335
 
    if (scheduler == NULL)
336
 
      return true;
337
 
  
338
 
    session->scheduler_arg= (void *)scheduler;
339
 
  
340
 
    libevent_session_add(session);
341
 
  
342
 
    return false;
343
 
  }
344
 
  
345
 
  
346
 
  /**
347
 
   * @brief
348
 
   *  Signal a waiting connection it's time to die.
349
 
   *
350
 
   * @details 
351
 
   *  This function will signal libevent the Session should be killed.
352
 
   *
353
 
   * @param[in]  session The connection to kill
354
 
   */
355
 
  virtual void killSession(Session *session)
356
 
  {
357
 
    char c= 0;
358
 
    
359
 
    pthread_mutex_lock(&LOCK_session_kill);
360
 
 
361
 
    if (sessions_to_be_killed.empty())
362
 
    {
363
 
      /* 
364
 
       Notify libevent with the killing event if this's the first killing
365
 
       notification of the batch
366
 
      */
367
 
      size_t written= write(session_kill_pipe[1], &c, sizeof(c));
368
 
      assert(written == sizeof(c));
369
 
    }
370
 
 
371
 
    /*
372
 
     Push into the sessions_to_be_killed queue
373
 
    */
374
 
    sessions_to_be_killed.push(session);
375
 
    pthread_mutex_unlock(&LOCK_session_kill);
376
 
  }
377
 
 
378
 
  /**
379
 
   * @brief
380
 
   *  Create all threads for the thread pool
381
 
   *
382
 
   * @details
383
 
   *  After threads are created we wait until all threads has signaled that
384
 
   *  they have started before we return
385
 
   *
386
 
   * @retval 0 Ok
387
 
   * @retval 1 We got an error creating the thread pool. In this case we will abort all created threads.
388
 
   */
389
 
  bool libevent_init(void)
390
 
  {
391
 
    uint32_t x;
392
 
  
393
 
    event_init();
394
 
  
395
 
    pthread_mutex_init(&LOCK_event_loop, NULL);
396
 
    pthread_mutex_init(&LOCK_session_add, NULL);
397
 
    pthread_mutex_init(&LOCK_session_kill, NULL);
398
 
  
399
 
    /* Set up the pipe used to add new sessions to the event pool */
400
 
    if (init_pipe(session_add_pipe))
401
 
    {
402
 
      errmsg_printf(ERRMSG_LVL_ERROR,
403
 
                    _("init_pipe(session_add_pipe) error in libevent_init\n"));
404
 
      return true;
405
 
    }
406
 
    /* Set up the pipe used to kill sessions in the event queue */
407
 
    if (init_pipe(session_kill_pipe))
408
 
    {
409
 
      errmsg_printf(ERRMSG_LVL_ERROR,
410
 
                    _("init_pipe(session_kill_pipe) error in libevent_init\n"));
411
 
      close(session_add_pipe[0]);
412
 
      close(session_add_pipe[1]);
413
 
     return true;
414
 
    }
415
 
    event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
416
 
              libevent_add_session_callback, NULL);
417
 
    event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
418
 
              libevent_kill_session_callback, NULL);
419
 
  
420
 
   if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
421
 
   {
422
 
     errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
423
 
     return true;
424
 
  
425
 
   }
426
 
    /* Set up the thread pool */
427
 
    pthread_mutex_lock(&LOCK_thread_count);
428
 
  
429
 
    for (x= 0; x < size; x++)
430
 
    {
431
 
      pthread_t thread;
432
 
      int error;
433
 
      if ((error= pthread_create(&thread, &attr, libevent_thread_proc, 0)))
434
 
      {
435
 
        errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
436
 
                        error);
437
 
        pthread_mutex_unlock(&LOCK_thread_count);
438
 
        return true;
439
 
      }
440
 
    }
441
 
  
442
 
    /* Wait until all threads are created */
443
 
    while (created_threads != size)
444
 
      pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
445
 
    pthread_mutex_unlock(&LOCK_thread_count);
446
 
  
447
 
    return false;
448
 
  }
449
 
}; 
450
 
 
451
 
 
452
 
/**
453
 
 * @brief 
454
 
 *  Factory class used to produce a Pool_of_threads_scheduler
455
 
 */
456
 
class PoolOfThreadsFactory : public plugin::SchedulerFactory
457
 
{
458
 
public:
459
 
  PoolOfThreadsFactory() : SchedulerFactory("pool_of_threads") {}
460
 
  ~PoolOfThreadsFactory() { if (scheduler != NULL) delete scheduler; }
461
 
  plugin::Scheduler *operator() ()
462
 
  {
463
 
    if (scheduler == NULL)
464
 
    {
465
 
      PoolOfThreadsScheduler *pot= new PoolOfThreadsScheduler();
466
 
      if (pot->libevent_init())
467
 
      {
468
 
        delete pot;
469
 
        return NULL;
470
 
      }
471
 
      scheduler= pot;
472
 
    }
473
 
    return scheduler;
474
 
  }
475
 
};
476
 
 
477
257
/**
478
258
 * @brief 
479
259
 *  Close and delete a connection.
683
463
}
684
464
 
685
465
 
686
 
static PoolOfThreadsFactory *factory= NULL;
 
466
PoolOfThreadsScheduler::~PoolOfThreadsScheduler()
 
467
{
 
468
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
469
 
 
470
  kill_pool_threads= true;
 
471
  while (created_threads)
 
472
  {
 
473
    /* 
 
474
      Wake up the event loop 
 
475
    */
 
476
    char c= 0;
 
477
    size_t written= write(session_add_pipe[1], &c, sizeof(c));
 
478
    assert(written == sizeof(c));
 
479
 
 
480
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
 
481
  }
 
482
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
483
 
 
484
  event_del(&session_add_event);
 
485
  close(session_add_pipe[0]);
 
486
  close(session_add_pipe[1]);
 
487
  event_del(&session_kill_event);
 
488
  close(session_kill_pipe[0]);
 
489
  close(session_kill_pipe[1]);
 
490
 
 
491
  (void) pthread_mutex_destroy(&LOCK_event_loop);
 
492
  (void) pthread_mutex_destroy(&LOCK_session_add);
 
493
  (void) pthread_mutex_destroy(&LOCK_session_kill);
 
494
  (void) pthread_attr_destroy(&attr);
 
495
}
 
496
 
 
497
 
 
498
bool PoolOfThreadsScheduler::addSession(Session *session)
 
499
{
 
500
  assert(session->scheduler_arg == NULL);
 
501
  session_scheduler *scheduler= new session_scheduler(session);
 
502
 
 
503
  if (scheduler == NULL)
 
504
    return true;
 
505
 
 
506
  session->scheduler_arg= (void *)scheduler;
 
507
 
 
508
  libevent_session_add(session);
 
509
 
 
510
  return false;
 
511
}
 
512
 
 
513
 
 
514
void PoolOfThreadsScheduler::killSession(Session *session)
 
515
{
 
516
  char c= 0;
 
517
 
 
518
  pthread_mutex_lock(&LOCK_session_kill);
 
519
 
 
520
  if (sessions_to_be_killed.empty())
 
521
  {
 
522
    /* 
 
523
      Notify libevent with the killing event if this's the first killing
 
524
      notification of the batch
 
525
    */
 
526
    size_t written= write(session_kill_pipe[1], &c, sizeof(c));
 
527
    assert(written == sizeof(c));
 
528
  }
 
529
 
 
530
  /*
 
531
    Push into the sessions_to_be_killed queue
 
532
  */
 
533
  sessions_to_be_killed.push(session);
 
534
  pthread_mutex_unlock(&LOCK_session_kill);
 
535
}
 
536
 
 
537
 
 
538
bool PoolOfThreadsScheduler::libevent_init(void)
 
539
{
 
540
  uint32_t x;
 
541
 
 
542
  event_init();
 
543
 
 
544
  pthread_mutex_init(&LOCK_event_loop, NULL);
 
545
  pthread_mutex_init(&LOCK_session_add, NULL);
 
546
  pthread_mutex_init(&LOCK_session_kill, NULL);
 
547
 
 
548
  /* Set up the pipe used to add new sessions to the event pool */
 
549
  if (init_pipe(session_add_pipe))
 
550
  {
 
551
    errmsg_printf(ERRMSG_LVL_ERROR,
 
552
                  _("init_pipe(session_add_pipe) error in libevent_init\n"));
 
553
    return true;
 
554
  }
 
555
  /* Set up the pipe used to kill sessions in the event queue */
 
556
  if (init_pipe(session_kill_pipe))
 
557
  {
 
558
    errmsg_printf(ERRMSG_LVL_ERROR,
 
559
                  _("init_pipe(session_kill_pipe) error in libevent_init\n"));
 
560
    close(session_add_pipe[0]);
 
561
    close(session_add_pipe[1]);
 
562
    return true;
 
563
  }
 
564
  event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
 
565
            libevent_add_session_callback, NULL);
 
566
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
 
567
            libevent_kill_session_callback, NULL);
 
568
 
 
569
  if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
 
570
  {
 
571
    errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
 
572
    return true;
 
573
 
 
574
  }
 
575
  /* Set up the thread pool */
 
576
  pthread_mutex_lock(&LOCK_thread_count);
 
577
 
 
578
  for (x= 0; x < size; x++)
 
579
  {
 
580
    pthread_t thread;
 
581
    int error;
 
582
    if ((error= pthread_create(&thread, &attr, libevent_thread_proc, 0)))
 
583
    {
 
584
      errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
 
585
                    error);
 
586
      pthread_mutex_unlock(&LOCK_thread_count);
 
587
      return true;
 
588
    }
 
589
  }
 
590
 
 
591
  /* Wait until all threads are created */
 
592
  while (created_threads != size)
 
593
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
 
594
  pthread_mutex_unlock(&LOCK_thread_count);
 
595
 
 
596
  return false;
 
597
}
687
598
 
688
599
 
689
600
/**
696
607
{
697
608
  assert(size != 0);
698
609
 
699
 
  factory= new PoolOfThreadsFactory();
700
 
  registry.add(factory);
 
610
  scheduler= new PoolOfThreadsScheduler("pool_of_threads");
 
611
  registry.add(scheduler);
701
612
 
702
613
  return 0;
703
614
}
708
619
 */
709
620
static int deinit(drizzled::plugin::Registry &registry)
710
621
{
711
 
  if (factory)
712
 
  {
713
 
    registry.remove(factory);
714
 
    delete factory;
715
 
  }
 
622
  registry.remove(scheduler);
 
623
  delete scheduler;
 
624
 
716
625
  return 0;
717
626
}
718
627