~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pool_of_threads/pool_of_threads.cc

Merged Eric from lp:~eday/drizzle/eday-merge

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
#include <drizzled/gettext.h>
18
18
#include <drizzled/error.h>
19
19
#include <drizzled/plugin/scheduler.h>
20
 
#include <drizzled/connect.h>
21
20
#include <drizzled/sql_parse.h>
22
21
#include <drizzled/session.h>
23
22
#include "session_scheduler.h"
26
25
#include <event.h>
27
26
 
28
27
using namespace std;
 
28
using namespace drizzled;
29
29
 
30
30
static volatile bool kill_pool_threads= false;
31
31
 
96
96
{
97
97
  safe_mutex_assert_owner(&LOCK_event_loop);
98
98
  Session *session= (Session*)ctx;
99
 
  session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
99
  session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
100
100
  assert(scheduler);
101
101
  sessions_waiting_for_io.remove(scheduler->session);
102
102
  sessions_need_processing.push_front(scheduler->session);
138
138
    Session *session= *it;
139
139
    if (session->killed == Session::KILL_CONNECTION)
140
140
    {
141
 
      session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
141
      session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
142
142
      assert(scheduler);
143
143
      /*
144
144
        Delete from libevent and add to the processing queue.
183
183
    /* pop the first session off the list */
184
184
    Session* session= sessions_need_adding.front();
185
185
    sessions_need_adding.pop_front();
186
 
    session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
186
    session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
187
187
    assert(scheduler);
188
188
 
189
189
    pthread_mutex_unlock(&LOCK_session_add);
215
215
}
216
216
 
217
217
 
218
 
class Pool_of_threads_scheduler: public Scheduler
 
218
class PoolOfThreadsScheduler: public plugin::Scheduler
219
219
{
220
220
private:
221
 
  pthread_attr_t thread_attrib;
 
221
  pthread_attr_t attr;
222
222
 
223
223
public:
224
 
  Pool_of_threads_scheduler(uint32_t max_size_in)
225
 
    : Scheduler(max_size_in)
 
224
  PoolOfThreadsScheduler(): Scheduler()
226
225
  {
227
 
    /* Parameter for threads created for connections */
228
 
    (void) pthread_attr_init(&thread_attrib);
229
 
    (void) pthread_attr_setdetachstate(&thread_attrib,
230
 
                                     PTHREAD_CREATE_DETACHED);
231
 
    pthread_attr_setscope(&thread_attrib, PTHREAD_SCOPE_SYSTEM);
232
 
    {
233
 
      struct sched_param tmp_sched_param;
234
 
  
235
 
      memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
236
 
      tmp_sched_param.sched_priority= WAIT_PRIOR;
237
 
      (void)pthread_attr_setschedparam(&thread_attrib, &tmp_sched_param);
238
 
    }
 
226
    struct sched_param tmp_sched_param;
 
227
 
 
228
    /* Setup attribute parameter for session threads. */
 
229
    (void) pthread_attr_init(&attr);
 
230
    (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 
231
    pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
 
232
    memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
 
233
    tmp_sched_param.sched_priority= WAIT_PRIOR;
 
234
    (void) pthread_attr_setschedparam(&attr, &tmp_sched_param);
239
235
  }
240
236
 
241
 
  ~Pool_of_threads_scheduler()
 
237
  ~PoolOfThreadsScheduler()
242
238
  {
243
239
    (void) pthread_mutex_lock(&LOCK_thread_count);
244
240
  
263
259
  
264
260
    (void) pthread_mutex_destroy(&LOCK_event_loop);
265
261
    (void) pthread_mutex_destroy(&LOCK_session_add);
 
262
    (void) pthread_attr_destroy(&attr);
266
263
  }
267
264
 
268
265
  /**
272
269
      LOCK_thread_count is locked on entry. This function MUST unlock it!
273
270
  */
274
271
  
275
 
  virtual bool add_connection(Session *session)
 
272
  virtual bool addSession(Session *session)
276
273
  {
277
 
    assert(session->scheduler == NULL);
 
274
    assert(session->scheduler_arg == NULL);
278
275
    session_scheduler *scheduler= new session_scheduler(session);
279
276
  
280
277
    if (scheduler == NULL)
281
278
      return true;
282
279
  
283
 
    session->scheduler= (void *)scheduler;
 
280
    session->scheduler_arg= (void *)scheduler;
284
281
  
285
282
    libevent_session_add(session);
286
283
  
298
295
    @param[in]  session The connection to kill
299
296
  */
300
297
  
301
 
  virtual void post_kill_notification(Session *)
 
298
  virtual void killSession(Session *)
302
299
  {
303
300
    /*
304
301
      Note, we just wake up libevent with an event that a Session should be killed,
314
311
    assert(written==sizeof(c));
315
312
  }
316
313
 
317
 
  virtual uint32_t count(void)
318
 
  {
319
 
    return created_threads;
320
 
  }
321
 
 
322
314
  /**
323
315
    Create all threads for the thread pool
324
316
  
375
367
    {
376
368
      pthread_t thread;
377
369
      int error;
378
 
      if ((error= pthread_create(&thread, &thread_attrib, libevent_thread_proc, 0)))
 
370
      if ((error= pthread_create(&thread, &attr, libevent_thread_proc, 0)))
379
371
      {
380
372
        errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
381
373
                        error);
394
386
}; 
395
387
 
396
388
 
397
 
class PoolOfThreadsFactory : public SchedulerFactory
 
389
class PoolOfThreadsFactory : public plugin::SchedulerFactory
398
390
{
399
391
public:
400
392
  PoolOfThreadsFactory() : SchedulerFactory("pool_of_threads") {}
401
393
  ~PoolOfThreadsFactory() { if (scheduler != NULL) delete scheduler; }
402
 
  Scheduler *operator() ()
 
394
  plugin::Scheduler *operator() ()
403
395
  {
404
396
    if (scheduler == NULL)
405
397
    {
406
 
      Pool_of_threads_scheduler *pot= new Pool_of_threads_scheduler(size);
 
398
      PoolOfThreadsScheduler *pot= new PoolOfThreadsScheduler();
407
399
      if (pot->libevent_init())
408
400
      {
409
401
        delete pot;
421
413
 
422
414
static void libevent_connection_close(Session *session)
423
415
{
424
 
  session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
416
  session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
425
417
  assert(scheduler);
426
418
  session->killed= Session::KILL_CONNECTION;          // Avoid error messages
427
419
 
432
424
  scheduler->thread_detach();
433
425
  
434
426
  delete scheduler;
435
 
  session->scheduler= NULL;
 
427
  session->scheduler_arg= NULL;
436
428
 
437
429
  unlink_session(session);   /* locks LOCK_thread_count and deletes session */
438
430
 
495
487
    /* pop the first session off the list */
496
488
    session= sessions_need_processing.front();
497
489
    sessions_need_processing.pop_front();
498
 
    session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
490
    session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
499
491
 
500
492
    (void) pthread_mutex_unlock(&LOCK_event_loop);
501
493
 
513
505
    /* is the connection logged in yet? */
514
506
    if (!scheduler->logged_in)
515
507
    {
516
 
      if (! session->authenticate())
 
508
      if (session->authenticate())
517
509
      {
518
510
        /* Failed to log in */
519
511
        libevent_connection_close(session);
562
554
 
563
555
static bool libevent_needs_immediate_processing(Session *session)
564
556
{
565
 
  session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
557
  session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
566
558
 
567
559
  if (libevent_should_close_connection(session))
568
560
  {
597
589
void libevent_session_add(Session* session)
598
590
{
599
591
  char c= 0;
600
 
  session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
592
  session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
601
593
  assert(scheduler);
602
594
 
603
595
  pthread_mutex_lock(&LOCK_session_add);