~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pool_of_threads/pool_of_threads.cc

  • Committer: Brian Aker
  • Date: 2009-10-02 19:38:12 UTC
  • mfrom: (1152.1.7 merge)
  • Revision ID: brian@gaz-20091002193812-mpd61oecep74t6gd
Merge Monty + Brian for plugins.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19
19
 */
20
20
 
21
 
#include <drizzled/server_includes.h>
22
 
#include <drizzled/gettext.h>
23
 
#include <drizzled/error.h>
24
 
#include <drizzled/plugin/scheduler.h>
25
 
#include <drizzled/sql_parse.h>
26
 
#include <drizzled/session.h>
27
 
#include <drizzled/plugin/client.h>
28
 
#include "session_scheduler.h"
29
 
#include <string>
30
 
#include <queue>
31
 
#include <set>
32
 
#include <event.h>
 
21
#include <plugin/pool_of_threads/pool_of_threads.h>
33
22
 
34
23
using namespace std;
35
24
using namespace drizzled;
37
26
/* We add this so as to avoid ambiguity with "struct queue" defined in Solaris system header */
38
27
using std::queue;
39
28
 
 
29
/* Global's (TBR) */
 
30
static PoolOfThreadsScheduler *scheduler= NULL;
 
31
 
40
32
/**
41
33
 * Set this to true to trigger killing of all threads in the pool
42
34
 */
122
114
{
123
115
  safe_mutex_assert_owner(&LOCK_event_loop);
124
116
  Session *session= (Session*)ctx;
125
 
  session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
126
 
  assert(scheduler);
127
 
  sessions_waiting_for_io.erase(scheduler->session);
128
 
  sessions_need_processing.push(scheduler->session);
 
117
  session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
 
118
  assert(sched);
 
119
  sessions_waiting_for_io.erase(sched->session);
 
120
  sessions_need_processing.push(sched->session);
129
121
}
130
122
 
131
123
/**
154
146
    Session* session= sessions_to_be_killed.front();
155
147
    pthread_mutex_unlock(&LOCK_session_kill);
156
148
 
157
 
    session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
158
 
    assert(scheduler);
 
149
    session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
 
150
    assert(sched);
159
151
    /*
160
152
     Delete from libevent and add to the processing queue.
161
153
    */
162
 
    event_del(&scheduler->io_event);
 
154
    event_del(&sched->io_event);
163
155
    /*
164
156
     Remove from the sessions_waiting_for_io set
165
157
    */
168
160
     Push into the sessions_need_processing; the kill action will be
169
161
     performed out of the event loop
170
162
    */
171
 
    sessions_need_processing.push(scheduler->session);
 
163
    sessions_need_processing.push(sched->session);
172
164
 
173
165
    pthread_mutex_lock(&LOCK_session_kill);
174
166
    /*
216
208
     Pop the first session off the queue 
217
209
    */
218
210
    Session* session= sessions_need_adding.front();
219
 
    session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
220
 
    assert(scheduler);
 
211
    session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
 
212
    assert(sched);
221
213
 
222
214
    pthread_mutex_unlock(&LOCK_session_add);
223
215
 
224
 
    if (!scheduler->logged_in || libevent_should_close_connection(session))
 
216
    if (!sched->logged_in || libevent_should_close_connection(session))
225
217
    {
226
218
      /*
227
219
       Add session to sessions_need_processing queue. If it needs closing
228
220
       we'll close it outside of event_loop().
229
221
      */
230
 
      sessions_need_processing.push(scheduler->session);
 
222
      sessions_need_processing.push(sched->session);
231
223
    }
232
224
    else
233
225
    {
234
226
      /* Add to libevent */
235
 
      if (event_add(&scheduler->io_event, NULL))
 
227
      if (event_add(&sched->io_event, NULL))
236
228
      {
237
229
        errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
238
230
        libevent_connection_close(session);
239
231
      }
240
232
      else
241
233
      {
242
 
        sessions_waiting_for_io.insert(scheduler->session);
 
234
        sessions_waiting_for_io.insert(sched->session);
243
235
      }
244
236
    }
245
237
 
262
254
  pthread_mutex_unlock(&LOCK_session_add);
263
255
}
264
256
 
265
 
 
266
 
/**
267
 
 * @brief 
268
 
 *  Derived class for pool of threads scheduler.
269
 
 */
270
 
class PoolOfThreadsScheduler: public plugin::Scheduler
271
 
{
272
 
private:
273
 
  pthread_attr_t attr;
274
 
 
275
 
public:
276
 
  PoolOfThreadsScheduler(): Scheduler()
277
 
  {
278
 
    struct sched_param tmp_sched_param;
279
 
 
280
 
    memset(&tmp_sched_param, 0, sizeof(struct sched_param));
281
 
    /* Setup attribute parameter for session threads. */
282
 
    (void) pthread_attr_init(&attr);
283
 
    (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
284
 
    pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
285
 
 
286
 
    tmp_sched_param.sched_priority= WAIT_PRIOR;
287
 
    (void) pthread_attr_setschedparam(&attr, &tmp_sched_param);
288
 
  }
289
 
 
290
 
  ~PoolOfThreadsScheduler()
291
 
  {
292
 
    (void) pthread_mutex_lock(&LOCK_thread_count);
293
 
  
294
 
    kill_pool_threads= true;
295
 
    while (created_threads)
296
 
    {
297
 
      /* 
298
 
       Wake up the event loop 
299
 
      */
300
 
      char c= 0;
301
 
      size_t written= write(session_add_pipe[1], &c, sizeof(c));
302
 
      assert(written == sizeof(c));
303
 
  
304
 
      pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
305
 
    }
306
 
    (void) pthread_mutex_unlock(&LOCK_thread_count);
307
 
  
308
 
    event_del(&session_add_event);
309
 
    close(session_add_pipe[0]);
310
 
    close(session_add_pipe[1]);
311
 
    event_del(&session_kill_event);
312
 
    close(session_kill_pipe[0]);
313
 
    close(session_kill_pipe[1]);
314
 
  
315
 
    (void) pthread_mutex_destroy(&LOCK_event_loop);
316
 
    (void) pthread_mutex_destroy(&LOCK_session_add);
317
 
    (void) pthread_mutex_destroy(&LOCK_session_kill);
318
 
    (void) pthread_attr_destroy(&attr);
319
 
  }
320
 
 
321
 
  /**
322
 
   * @brief 
323
 
   *  Notify the thread pool about a new connection
324
 
   *
325
 
   * @param[in] the newly connected session 
326
 
   *
327
 
   * @return 
328
 
   *  True if there is an error.
329
 
   */
330
 
  virtual bool addSession(Session *session)
331
 
  {
332
 
    assert(session->scheduler_arg == NULL);
333
 
    session_scheduler *scheduler= new session_scheduler(session);
334
 
  
335
 
    if (scheduler == NULL)
336
 
      return true;
337
 
  
338
 
    session->scheduler_arg= (void *)scheduler;
339
 
  
340
 
    libevent_session_add(session);
341
 
  
342
 
    return false;
343
 
  }
344
 
  
345
 
  
346
 
  /**
347
 
   * @brief
348
 
   *  Signal a waiting connection it's time to die.
349
 
   *
350
 
   * @details 
351
 
   *  This function will signal libevent the Session should be killed.
352
 
   *
353
 
   * @param[in]  session The connection to kill
354
 
   */
355
 
  virtual void killSession(Session *session)
356
 
  {
357
 
    char c= 0;
358
 
    
359
 
    pthread_mutex_lock(&LOCK_session_kill);
360
 
 
361
 
    if (sessions_to_be_killed.empty())
362
 
    {
363
 
      /* 
364
 
       Notify libevent with the killing event if this's the first killing
365
 
       notification of the batch
366
 
      */
367
 
      size_t written= write(session_kill_pipe[1], &c, sizeof(c));
368
 
      assert(written == sizeof(c));
369
 
    }
370
 
 
371
 
    /*
372
 
     Push into the sessions_to_be_killed queue
373
 
    */
374
 
    sessions_to_be_killed.push(session);
375
 
    pthread_mutex_unlock(&LOCK_session_kill);
376
 
  }
377
 
 
378
 
  /**
379
 
   * @brief
380
 
   *  Create all threads for the thread pool
381
 
   *
382
 
   * @details
383
 
   *  After threads are created we wait until all threads has signaled that
384
 
   *  they have started before we return
385
 
   *
386
 
   * @retval 0 Ok
387
 
   * @retval 1 We got an error creating the thread pool. In this case we will abort all created threads.
388
 
   */
389
 
  bool libevent_init(void)
390
 
  {
391
 
    uint32_t x;
392
 
  
393
 
    event_init();
394
 
  
395
 
    pthread_mutex_init(&LOCK_event_loop, NULL);
396
 
    pthread_mutex_init(&LOCK_session_add, NULL);
397
 
    pthread_mutex_init(&LOCK_session_kill, NULL);
398
 
  
399
 
    /* Set up the pipe used to add new sessions to the event pool */
400
 
    if (init_pipe(session_add_pipe))
401
 
    {
402
 
      errmsg_printf(ERRMSG_LVL_ERROR,
403
 
                    _("init_pipe(session_add_pipe) error in libevent_init\n"));
404
 
      return true;
405
 
    }
406
 
    /* Set up the pipe used to kill sessions in the event queue */
407
 
    if (init_pipe(session_kill_pipe))
408
 
    {
409
 
      errmsg_printf(ERRMSG_LVL_ERROR,
410
 
                    _("init_pipe(session_kill_pipe) error in libevent_init\n"));
411
 
      close(session_add_pipe[0]);
412
 
      close(session_add_pipe[1]);
413
 
     return true;
414
 
    }
415
 
    event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
416
 
              libevent_add_session_callback, NULL);
417
 
    event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
418
 
              libevent_kill_session_callback, NULL);
419
 
  
420
 
   if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
421
 
   {
422
 
     errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
423
 
     return true;
424
 
  
425
 
   }
426
 
    /* Set up the thread pool */
427
 
    pthread_mutex_lock(&LOCK_thread_count);
428
 
  
429
 
    for (x= 0; x < size; x++)
430
 
    {
431
 
      pthread_t thread;
432
 
      int error;
433
 
      if ((error= pthread_create(&thread, &attr, libevent_thread_proc, 0)))
434
 
      {
435
 
        errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
436
 
                        error);
437
 
        pthread_mutex_unlock(&LOCK_thread_count);
438
 
        return true;
439
 
      }
440
 
    }
441
 
  
442
 
    /* Wait until all threads are created */
443
 
    while (created_threads != size)
444
 
      pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
445
 
    pthread_mutex_unlock(&LOCK_thread_count);
446
 
  
447
 
    return false;
448
 
  }
449
 
}; 
450
 
 
451
 
 
452
 
/**
453
 
 * @brief 
454
 
 *  Factory class used to produce a Pool_of_threads_scheduler
455
 
 */
456
 
class PoolOfThreadsFactory : public plugin::SchedulerFactory
457
 
{
458
 
public:
459
 
  PoolOfThreadsFactory() : SchedulerFactory("pool_of_threads") {}
460
 
  ~PoolOfThreadsFactory() { if (scheduler != NULL) delete scheduler; }
461
 
  plugin::Scheduler *operator() ()
462
 
  {
463
 
    if (scheduler == NULL)
464
 
    {
465
 
      PoolOfThreadsScheduler *pot= new PoolOfThreadsScheduler();
466
 
      if (pot->libevent_init())
467
 
      {
468
 
        delete pot;
469
 
        return NULL;
470
 
      }
471
 
      scheduler= pot;
472
 
    }
473
 
    return scheduler;
474
 
  }
475
 
};
476
 
 
477
257
/**
478
258
 * @brief 
479
259
 *  Close and delete a connection.
480
260
 */
481
261
static void libevent_connection_close(Session *session)
482
262
{
483
 
  session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
484
 
  assert(scheduler);
 
263
  session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
 
264
  assert(sched);
485
265
  session->killed= Session::KILL_CONNECTION;    /* Avoid error messages */
486
266
 
487
267
  if (session->client->getFileDescriptor() >= 0) /* not already closed */
488
268
  {
489
269
    session->disconnect(0, true);
490
270
  }
491
 
  scheduler->thread_detach();
 
271
  sched->thread_detach();
492
272
  
493
 
  delete scheduler;
 
273
  delete sched;
494
274
  session->scheduler_arg= NULL;
495
275
 
496
276
  unlink_session(session);   /* locks LOCK_thread_count and deletes session */
558
338
    /* pop the first session off the queue */
559
339
    session= sessions_need_processing.front();
560
340
    sessions_need_processing.pop();
561
 
    session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
 
341
    session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
562
342
 
563
343
    (void) pthread_mutex_unlock(&LOCK_event_loop);
564
344
 
567
347
    /* set up the session<->thread links. */
568
348
    session->thread_stack= (char*) &session;
569
349
 
570
 
    if (scheduler->thread_attach())
 
350
    if (sched->thread_attach())
571
351
    {
572
352
      libevent_connection_close(session);
573
353
      continue;
574
354
    }
575
355
 
576
356
    /* is the connection logged in yet? */
577
 
    if (!scheduler->logged_in)
 
357
    if (!sched->logged_in)
578
358
    {
579
359
      if (session->authenticate())
580
360
      {
585
365
      else
586
366
      {
587
367
        /* login successful */
588
 
        scheduler->logged_in= true;
 
368
        sched->logged_in= true;
589
369
        session->prepareForQueries();
590
370
        if (!libevent_needs_immediate_processing(session))
591
371
          continue; /* New connection is now waiting for data in libevent*/
629
409
 */
630
410
static bool libevent_needs_immediate_processing(Session *session)
631
411
{
632
 
  session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
 
412
  session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
633
413
 
634
414
  if (libevent_should_close_connection(session))
635
415
  {
647
427
  if (session->client->haveMoreData())
648
428
    return true;
649
429
 
650
 
  scheduler->thread_detach();
 
430
  sched->thread_detach();
651
431
  libevent_session_add(session);
652
432
 
653
433
  return false;
667
447
void libevent_session_add(Session* session)
668
448
{
669
449
  char c= 0;
670
 
  session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
671
 
  assert(scheduler);
 
450
  session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
 
451
  assert(sched);
672
452
 
673
453
  pthread_mutex_lock(&LOCK_session_add);
674
454
  if (sessions_need_adding.empty())
678
458
    assert(written == sizeof(c));
679
459
  }
680
460
  /* queue for libevent */
681
 
  sessions_need_adding.push(scheduler->session);
 
461
  sessions_need_adding.push(sched->session);
682
462
  pthread_mutex_unlock(&LOCK_session_add);
683
463
}
684
464
 
685
465
 
686
 
static PoolOfThreadsFactory *factory= NULL;
 
466
PoolOfThreadsScheduler::~PoolOfThreadsScheduler()
 
467
{
 
468
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
469
 
 
470
  kill_pool_threads= true;
 
471
  while (created_threads)
 
472
  {
 
473
    /* 
 
474
      Wake up the event loop 
 
475
    */
 
476
    char c= 0;
 
477
    size_t written= write(session_add_pipe[1], &c, sizeof(c));
 
478
    assert(written == sizeof(c));
 
479
 
 
480
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
 
481
  }
 
482
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
483
 
 
484
  event_del(&session_add_event);
 
485
  close(session_add_pipe[0]);
 
486
  close(session_add_pipe[1]);
 
487
  event_del(&session_kill_event);
 
488
  close(session_kill_pipe[0]);
 
489
  close(session_kill_pipe[1]);
 
490
 
 
491
  (void) pthread_mutex_destroy(&LOCK_event_loop);
 
492
  (void) pthread_mutex_destroy(&LOCK_session_add);
 
493
  (void) pthread_mutex_destroy(&LOCK_session_kill);
 
494
  (void) pthread_attr_destroy(&attr);
 
495
}
 
496
 
 
497
 
 
498
bool PoolOfThreadsScheduler::addSession(Session *session)
 
499
{
 
500
  assert(session->scheduler_arg == NULL);
 
501
  session_scheduler *sched= new session_scheduler(session);
 
502
 
 
503
  if (sched == NULL)
 
504
    return true;
 
505
 
 
506
  session->scheduler_arg= (void *)sched;
 
507
 
 
508
  libevent_session_add(session);
 
509
 
 
510
  return false;
 
511
}
 
512
 
 
513
 
 
514
void PoolOfThreadsScheduler::killSession(Session *session)
 
515
{
 
516
  char c= 0;
 
517
 
 
518
  pthread_mutex_lock(&LOCK_session_kill);
 
519
 
 
520
  if (sessions_to_be_killed.empty())
 
521
  {
 
522
    /* 
 
523
      Notify libevent with the killing event if this's the first killing
 
524
      notification of the batch
 
525
    */
 
526
    size_t written= write(session_kill_pipe[1], &c, sizeof(c));
 
527
    assert(written == sizeof(c));
 
528
  }
 
529
 
 
530
  /*
 
531
    Push into the sessions_to_be_killed queue
 
532
  */
 
533
  sessions_to_be_killed.push(session);
 
534
  pthread_mutex_unlock(&LOCK_session_kill);
 
535
}
 
536
 
 
537
 
 
538
bool PoolOfThreadsScheduler::libevent_init(void)
 
539
{
 
540
  uint32_t x;
 
541
 
 
542
  event_init();
 
543
 
 
544
  pthread_mutex_init(&LOCK_event_loop, NULL);
 
545
  pthread_mutex_init(&LOCK_session_add, NULL);
 
546
  pthread_mutex_init(&LOCK_session_kill, NULL);
 
547
 
 
548
  /* Set up the pipe used to add new sessions to the event pool */
 
549
  if (init_pipe(session_add_pipe))
 
550
  {
 
551
    errmsg_printf(ERRMSG_LVL_ERROR,
 
552
                  _("init_pipe(session_add_pipe) error in libevent_init\n"));
 
553
    return true;
 
554
  }
 
555
  /* Set up the pipe used to kill sessions in the event queue */
 
556
  if (init_pipe(session_kill_pipe))
 
557
  {
 
558
    errmsg_printf(ERRMSG_LVL_ERROR,
 
559
                  _("init_pipe(session_kill_pipe) error in libevent_init\n"));
 
560
    close(session_add_pipe[0]);
 
561
    close(session_add_pipe[1]);
 
562
    return true;
 
563
  }
 
564
  event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
 
565
            libevent_add_session_callback, NULL);
 
566
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
 
567
            libevent_kill_session_callback, NULL);
 
568
 
 
569
  if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
 
570
  {
 
571
    errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
 
572
    return true;
 
573
 
 
574
  }
 
575
  /* Set up the thread pool */
 
576
  pthread_mutex_lock(&LOCK_thread_count);
 
577
 
 
578
  for (x= 0; x < size; x++)
 
579
  {
 
580
    pthread_t thread;
 
581
    int error;
 
582
    if ((error= pthread_create(&thread, &attr, libevent_thread_proc, 0)))
 
583
    {
 
584
      errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
 
585
                    error);
 
586
      pthread_mutex_unlock(&LOCK_thread_count);
 
587
      return true;
 
588
    }
 
589
  }
 
590
 
 
591
  /* Wait until all threads are created */
 
592
  while (created_threads != size)
 
593
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
 
594
  pthread_mutex_unlock(&LOCK_thread_count);
 
595
 
 
596
  return false;
 
597
}
687
598
 
688
599
 
689
600
/**
696
607
{
697
608
  assert(size != 0);
698
609
 
699
 
  factory= new PoolOfThreadsFactory();
700
 
  registry.add(factory);
 
610
  scheduler= new PoolOfThreadsScheduler("pool_of_threads");
 
611
  registry.add(scheduler);
701
612
 
702
613
  return 0;
703
614
}
708
619
 */
709
620
static int deinit(drizzled::plugin::Registry &registry)
710
621
{
711
 
  if (factory)
712
 
  {
713
 
    registry.remove(factory);
714
 
    delete factory;
715
 
  }
 
622
  registry.remove(scheduler);
 
623
  delete scheduler;
 
624
 
716
625
  return 0;
717
626
}
718
627