~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pool_of_threads/pool_of_threads.cc

  • Committer: Brian Aker
  • Date: 2010-09-16 00:58:27 UTC
  • mfrom: (1766.1.2 trunk)
  • Revision ID: brian@tangent.org-20100916005827-f339tulsvm8ww358
Merge of Brian refactor + fix for Solaris find.

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
#include <boost/program_options.hpp>
27
27
#include <drizzled/module/option_map.h>
28
28
 
 
29
#include <boost/thread/thread.hpp>
 
30
#include <boost/bind.hpp>
 
31
 
29
32
namespace po= boost::program_options;
30
33
using namespace std;
31
34
using namespace drizzled;
51
54
static void libevent_connection_close(Session *session);
52
55
void libevent_session_add(Session* session);
53
56
bool libevent_should_close_connection(Session* session);
 
57
void libevent_thread_proc(PoolOfThreadsScheduler *pot_scheduler);
54
58
extern "C" {
 
59
#if 0
55
60
  void *libevent_thread_proc(void *arg);
 
61
#endif
56
62
  void libevent_io_callback(int Fd, short Operation, void *ctx);
57
63
  void libevent_add_session_callback(int Fd, short Operation, void *ctx);
58
64
  void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
131
137
  char c;
132
138
  int count= 0;
133
139
 
134
 
  pthread_mutex_lock(&LOCK_session_kill);
 
140
  LOCK_session_kill.lock();
135
141
  while (! sessions_to_be_killed.empty())
136
142
  {
137
143
 
139
145
     Fetch a session from the queue
140
146
    */
141
147
    Session* session= sessions_to_be_killed.front();
142
 
    pthread_mutex_unlock(&LOCK_session_kill);
 
148
    LOCK_session_kill.unlock();
143
149
 
144
150
    session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
145
151
    assert(sched);
158
164
    */
159
165
    sessions_need_processing.push(sched->session);
160
166
 
161
 
    pthread_mutex_lock(&LOCK_session_kill);
 
167
    LOCK_session_kill.lock();
162
168
    /*
163
169
     Pop until this session is already processed
164
170
    */
174
180
    count++;
175
181
  }
176
182
  assert(count == 1);
177
 
  pthread_mutex_unlock(&LOCK_session_kill);
 
183
  LOCK_session_kill.unlock();
178
184
}
179
185
 
180
186
 
203
209
  char c;
204
210
  int count= 0;
205
211
 
206
 
  pthread_mutex_lock(&LOCK_session_add);
 
212
  LOCK_session_add.lock();
207
213
  while (! sessions_need_adding.empty())
208
214
  {
209
215
    /*
210
216
     Pop the first session off the queue 
211
217
    */
212
218
    Session* session= sessions_need_adding.front();
213
 
    pthread_mutex_unlock(&LOCK_session_add);
 
219
    LOCK_session_add.unlock();
214
220
 
215
221
    session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
216
222
    assert(sched);
238
244
      }
239
245
    }
240
246
 
241
 
    pthread_mutex_lock(&LOCK_session_add);
 
247
    LOCK_session_add.lock();
242
248
    /*
243
249
     Pop until this session is already processed
244
250
    */
254
260
    count++;
255
261
  }
256
262
  assert(count == 1);
257
 
  pthread_mutex_unlock(&LOCK_session_add);
 
263
  LOCK_session_add.unlock();
258
264
}
259
265
 
260
266
/**
302
308
 *  These procs only return/terminate on shutdown (kill_pool_threads ==
303
309
 *  true).
304
310
 */
305
 
void *libevent_thread_proc(void *ctx)
 
311
void libevent_thread_proc(PoolOfThreadsScheduler *pot_scheduler)
306
312
{
307
313
  if (internal::my_thread_init())
308
314
  {
311
317
    exit(1);
312
318
  }
313
319
 
314
 
  PoolOfThreadsScheduler *pot_scheduler=
315
 
    reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
316
 
  return pot_scheduler->mainLoop();
 
320
  (void)pot_scheduler->mainLoop();
317
321
}
318
322
 
319
323
void *PoolOfThreadsScheduler::mainLoop()
332
336
  for (;;)
333
337
  {
334
338
    Session *session= NULL;
335
 
    (void) pthread_mutex_lock(&LOCK_event_loop);
 
339
    LOCK_event_loop.lock();
336
340
 
337
341
    /* get session(s) to process */
338
342
    while (sessions_need_processing.empty())
340
344
      if (kill_pool_threads)
341
345
      {
342
346
        /* the flag that we should die has been set */
343
 
        (void) pthread_mutex_unlock(&LOCK_event_loop);
 
347
        LOCK_event_loop.unlock();
344
348
        goto thread_exit;
345
349
      }
346
350
      event_loop(EVLOOP_ONCE);
351
355
    sessions_need_processing.pop();
352
356
    session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
353
357
 
354
 
    (void) pthread_mutex_unlock(&LOCK_event_loop);
 
358
    LOCK_event_loop.lock();
355
359
 
356
360
    /* now we process the connection (session) */
357
361
 
467
471
void PoolOfThreadsScheduler::sessionAddToQueue(session_scheduler *sched)
468
472
{
469
473
  char c= 0;
470
 
  pthread_mutex_lock(&LOCK_session_add);
 
474
  boost::mutex::scoped_lock scopedLock(LOCK_session_add);
471
475
  if (sessions_need_adding.empty())
472
476
  {
473
477
    /* notify libevent */
476
480
  }
477
481
  /* queue for libevent */
478
482
  sessions_need_adding.push(sched->session);
479
 
  pthread_mutex_unlock(&LOCK_session_add);
480
483
}
481
484
 
482
485
 
484
487
  : Scheduler(name_arg), sessions_need_adding(), sessions_to_be_killed(),
485
488
    sessions_need_processing(), sessions_waiting_for_io()
486
489
{
487
 
  struct sched_param tmp_sched_param;
488
 
 
489
 
  memset(&tmp_sched_param, 0, sizeof(struct sched_param));
490
490
  /* Setup attribute parameter for session threads. */
491
491
  (void) pthread_attr_init(&attr);
492
492
  (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
493
493
  pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
494
 
 
495
 
  tmp_sched_param.sched_priority= WAIT_PRIOR;
496
 
  (void) pthread_attr_setschedparam(&attr, &tmp_sched_param);
497
 
 
498
 
  pthread_mutex_init(&LOCK_session_add, NULL);
499
 
  pthread_mutex_init(&LOCK_session_kill, NULL);
500
 
  pthread_mutex_init(&LOCK_event_loop, NULL);
501
 
 
502
494
}
503
495
 
504
496
 
527
519
  close(session_kill_pipe[0]);
528
520
  close(session_kill_pipe[1]);
529
521
 
530
 
  (void) pthread_mutex_destroy(&LOCK_event_loop);
531
 
  (void) pthread_mutex_destroy(&LOCK_session_add);
532
 
  (void) pthread_mutex_destroy(&LOCK_session_kill);
533
522
  (void) pthread_attr_destroy(&attr);
534
523
}
535
524
 
554
543
{
555
544
  char c= 0;
556
545
 
557
 
  pthread_mutex_lock(&LOCK_session_kill);
 
546
  boost::mutex::scoped_lock scopedLock(LOCK_session_kill);
558
547
 
559
548
  if (sessions_to_be_killed.empty())
560
549
  {
570
559
    Push into the sessions_to_be_killed queue
571
560
  */
572
561
  sessions_to_be_killed.push(session);
573
 
  pthread_mutex_unlock(&LOCK_session_kill);
574
562
}
575
563
 
576
564
 
577
565
bool PoolOfThreadsScheduler::libevent_init(void)
578
566
{
579
 
  uint32_t x;
580
 
 
581
567
  event_init();
582
568
 
583
569
 
609
595
 
610
596
  }
611
597
  /* Set up the thread pool */
612
 
  LOCK_thread_count.lock();
 
598
  boost::mutex::scoped_lock scopedLock(LOCK_thread_count);
613
599
 
614
 
  for (x= 0; x < pool_size; x++)
 
600
  for (uint32_t x= 0; x < pool_size; x++)
615
601
  {
616
 
    pthread_t thread;
617
 
    int error;
618
 
    if ((error= pthread_create(&thread, &attr, libevent_thread_proc, this)))
 
602
    if (not new boost::thread(boost::bind(libevent_thread_proc, this)))
619
603
    {
620
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
621
 
                    error);
622
 
      LOCK_thread_count.unlock();
 
604
      errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"), 1);
623
605
      return true;
624
606
    }
625
607
  }
626
608
 
627
609
  /* Wait until all threads are created */
628
610
  while (created_threads != pool_size)
629
 
    pthread_cond_wait(COND_thread_count.native_handle(), LOCK_thread_count.native_handle());
630
 
  LOCK_thread_count.unlock();
 
611
  {
 
612
    COND_thread_count.wait(scopedLock);
 
613
  }
631
614
 
632
615
  return false;
633
616
}