~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/scheduler.cc

  • Committer: Brian Aker
  • Date: 2009-01-24 04:31:39 UTC
  • Revision ID: brian@gir-3.local-20090124043139-5cu9wjefszrnyhe0
Refactored all current scheduler to be behind scheduler plugin api.

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
#include <drizzled/gettext.h>
24
24
#include <drizzled/sql_parse.h>
25
25
#include <drizzled/scheduler.h>
26
 
#include <drizzled/session.h>
27
26
/* API for connecting, logging in to a drizzled server */
28
27
#include <drizzled/connect.h>
29
28
 
 
29
class Session;
 
30
 
30
31
/*
31
32
  'Dummy' functions to be used when we don't need any handling for a scheduler
32
33
  event
52
53
   end_thread(end_thread_dummy), end(end_dummy)
53
54
{}
54
55
 
55
 
static uint32_t created_threads, killed_threads;
56
 
static bool kill_pool_threads;
57
 
 
58
 
static struct event session_add_event;
59
 
static struct event session_kill_event;
60
 
 
61
 
static pthread_mutex_t LOCK_session_add;    /* protects sessions_need_adding */
62
 
static LIST *sessions_need_adding= NULL;    /* list of sessions to add to libevent queue */
63
 
 
64
 
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
65
 
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
66
 
 
67
 
/*
68
 
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and
69
 
  event_del) and sessions_need_processing and sessions_waiting_for_io.
70
 
*/
71
 
static pthread_mutex_t LOCK_event_loop;
72
 
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
73
 
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
74
 
 
75
 
pthread_handler_t libevent_thread_proc(void *arg);
76
 
static void libevent_end();
77
 
static bool libevent_needs_immediate_processing(Session *session);
78
 
static void libevent_connection_close(Session *session);
79
 
static bool libevent_should_close_connection(Session* session);
80
 
static void libevent_session_add(Session* session);
81
 
void libevent_io_callback(int Fd, short Operation, void *ctx);
82
 
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
83
 
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
84
 
 
85
 
 
86
 
/*
87
 
  Create a pipe and set to non-blocking.
88
 
  Returns true if there is an error.
89
 
*/
90
 
 
91
 
static bool init_pipe(int pipe_fds[])
92
 
{
93
 
  int flags;
94
 
  return pipe(pipe_fds) < 0 ||
95
 
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
96
 
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
97
 
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
98
 
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
99
 
}
100
 
 
101
 
 
102
 
/*
103
 
  session_scheduler keeps the link between Session and events.
104
 
  It's embedded in the Session class.
105
 
*/
106
 
 
107
 
session_scheduler::session_scheduler()
108
 
  : logged_in(false), io_event(NULL), thread_attached(false)
109
 
{
110
 
}
111
 
 
112
 
 
113
 
session_scheduler::~session_scheduler()
114
 
{
115
 
  delete io_event;
116
 
}
117
 
 
118
 
 
119
 
session_scheduler::session_scheduler(const session_scheduler&)
120
 
  : logged_in(false), io_event(NULL), thread_attached(false)
121
 
{}
122
 
 
123
 
void session_scheduler::operator=(const session_scheduler&)
124
 
{}
125
 
 
126
 
bool session_scheduler::init(Session *parent_session)
127
 
{
128
 
  io_event= new struct event;
129
 
 
130
 
  if (io_event == NULL)
131
 
  {
132
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Memory allocation error in session_scheduler::init\n"));
133
 
    return true;
134
 
  }
135
 
  memset(io_event, 0, sizeof(*io_event));
136
 
 
137
 
  event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ,
138
 
            libevent_io_callback, (void*)parent_session);
139
 
 
140
 
  list.data= parent_session;
141
 
 
142
 
  return false;
143
 
}
144
 
 
145
 
 
146
 
/*
147
 
  Attach/associate the connection with the OS thread, for command processing.
148
 
*/
149
 
 
150
 
bool session_scheduler::thread_attach()
151
 
{
152
 
  assert(!thread_attached);
153
 
  Session* session = (Session*)list.data;
154
 
  if (libevent_should_close_connection(session) ||
155
 
      setup_connection_thread_globals(session))
156
 
  {
157
 
    return true;
158
 
  }
159
 
  my_errno= 0;
160
 
  session->mysys_var->abort= 0;
161
 
  thread_attached= true;
162
 
 
163
 
  return false;
164
 
}
165
 
 
166
 
 
167
 
/*
168
 
  Detach/disassociate the connection with the OS thread.
169
 
*/
170
 
 
171
 
void session_scheduler::thread_detach()
172
 
{
173
 
  if (thread_attached)
174
 
  {
175
 
    Session* session = (Session*)list.data;
176
 
    session->mysys_var= NULL;
177
 
    thread_attached= false;
178
 
  }
179
 
}
180
 
 
181
 
/**
182
 
  Create all threads for the thread pool
183
 
 
184
 
  NOTES
185
 
    After threads are created we wait until all threads has signaled that
186
 
    they have started before we return
187
 
 
188
 
  RETURN
189
 
    0  ok
190
 
    1  We got an error creating the thread pool
191
 
       In this case we will abort all created threads
192
 
*/
193
 
 
194
 
static bool libevent_init(void)
195
 
{
196
 
  uint32_t i;
197
 
 
198
 
  event_init();
199
 
 
200
 
  created_threads= 0;
201
 
  killed_threads= 0;
202
 
  kill_pool_threads= false;
203
 
 
204
 
  pthread_mutex_init(&LOCK_event_loop, NULL);
205
 
  pthread_mutex_init(&LOCK_session_add, NULL);
206
 
 
207
 
  /* set up the pipe used to add new sessions to the event pool */
208
 
  if (init_pipe(session_add_pipe))
209
 
  {
210
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_add_pipe) error in libevent_init\n"));
211
 
    return(1);
212
 
  }
213
 
  /* set up the pipe used to kill sessions in the event queue */
214
 
  if (init_pipe(session_kill_pipe))
215
 
  {
216
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_kill_pipe) error in libevent_init\n"));
217
 
    close(session_add_pipe[0]);
218
 
    close(session_add_pipe[1]);
219
 
    return(1);
220
 
  }
221
 
  event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
222
 
            libevent_add_session_callback, NULL);
223
 
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
224
 
            libevent_kill_session_callback, NULL);
225
 
 
226
 
 if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
227
 
 {
228
 
   errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
229
 
   libevent_end();
230
 
   return(1);
231
 
 
232
 
 }
233
 
  /* Set up the thread pool */
234
 
  created_threads= killed_threads= 0;
235
 
  pthread_mutex_lock(&LOCK_thread_count);
236
 
 
237
 
  for (i= 0; i < thread_pool_size; i++)
238
 
  {
239
 
    pthread_t thread;
240
 
    int error;
241
 
    if ((error= pthread_create(&thread, &connection_attrib,
242
 
                               libevent_thread_proc, 0)))
243
 
    {
244
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
245
 
                      error);
246
 
      pthread_mutex_unlock(&LOCK_thread_count);
247
 
      libevent_end();                      // Cleanup
248
 
      return(true);
249
 
    }
250
 
  }
251
 
 
252
 
  /* Wait until all threads are created */
253
 
  while (created_threads != thread_pool_size)
254
 
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
255
 
  pthread_mutex_unlock(&LOCK_thread_count);
256
 
 
257
 
  return(false);
258
 
}
259
 
 
260
 
 
261
 
/*
262
 
  This is called when data is ready on the socket.
263
 
 
264
 
  NOTES
265
 
    This is only called by the thread that owns LOCK_event_loop.
266
 
 
267
 
    We add the session that got the data to sessions_need_processing, and
268
 
    cause the libevent event_loop() to terminate. Then this same thread will
269
 
    return from event_loop and pick the session value back up for processing.
270
 
*/
271
 
 
272
 
void libevent_io_callback(int, short, void *ctx)
273
 
{
274
 
  safe_mutex_assert_owner(&LOCK_event_loop);
275
 
  Session *session= (Session*)ctx;
276
 
  sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
277
 
  sessions_need_processing= list_add(sessions_need_processing, &session->scheduler.list);
278
 
}
279
 
 
280
 
/*
281
 
  This is called when we have a thread we want to be killed.
282
 
 
283
 
  NOTES
284
 
    This is only called by the thread that owns LOCK_event_loop.
285
 
*/
286
 
 
287
 
void libevent_kill_session_callback(int Fd, short, void*)
288
 
{
289
 
  safe_mutex_assert_owner(&LOCK_event_loop);
290
 
 
291
 
  /* clear the pending events */
292
 
  char c;
293
 
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
294
 
  {}
295
 
 
296
 
  LIST* list= sessions_waiting_for_io;
297
 
  while (list)
298
 
  {
299
 
    Session *session= (Session*)list->data;
300
 
    list= list_rest(list);
301
 
    if (session->killed == Session::KILL_CONNECTION)
302
 
    {
303
 
      /*
304
 
        Delete from libevent and add to the processing queue.
305
 
      */
306
 
      event_del(session->scheduler.io_event);
307
 
      sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
308
 
                                       &session->scheduler.list);
309
 
      sessions_need_processing= list_add(sessions_need_processing,
310
 
                                     &session->scheduler.list);
311
 
    }
312
 
  }
313
 
}
314
 
 
315
 
 
316
 
/*
317
 
  This is used to add connections to the pool. This callback is invoked from
318
 
  the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
319
 
  written to it.
320
 
 
321
 
  NOTES
322
 
    This is only called by the thread that owns LOCK_event_loop.
323
 
*/
324
 
 
325
 
void libevent_add_session_callback(int Fd, short, void *)
326
 
{
327
 
  safe_mutex_assert_owner(&LOCK_event_loop);
328
 
 
329
 
  /* clear the pending events */
330
 
  char c;
331
 
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
332
 
  {}
333
 
 
334
 
  pthread_mutex_lock(&LOCK_session_add);
335
 
  while (sessions_need_adding)
336
 
  {
337
 
    /* pop the first session off the list */
338
 
    Session* session= (Session*)sessions_need_adding->data;
339
 
    sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
340
 
 
341
 
    pthread_mutex_unlock(&LOCK_session_add);
342
 
 
343
 
    if (!session->scheduler.logged_in || libevent_should_close_connection(session))
344
 
    {
345
 
      /*
346
 
        Add session to sessions_need_processing list. If it needs closing we'll close
347
 
        it outside of event_loop().
348
 
      */
349
 
      sessions_need_processing= list_add(sessions_need_processing,
350
 
                                     &session->scheduler.list);
351
 
    }
352
 
    else
353
 
    {
354
 
      /* Add to libevent */
355
 
      if (event_add(session->scheduler.io_event, NULL))
356
 
      {
357
 
        errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
358
 
        libevent_connection_close(session);
359
 
      }
360
 
      else
361
 
      {
362
 
        sessions_waiting_for_io= list_add(sessions_waiting_for_io,
363
 
                                      &session->scheduler.list);
364
 
      }
365
 
    }
366
 
    pthread_mutex_lock(&LOCK_session_add);
367
 
  }
368
 
  pthread_mutex_unlock(&LOCK_session_add);
369
 
}
370
 
 
371
 
 
372
 
/**
373
 
  Notify the thread pool about a new connection
374
 
 
375
 
  NOTES
376
 
    LOCK_thread_count is locked on entry. This function MUST unlock it!
377
 
*/
378
 
 
379
 
static void libevent_add_connection(Session *session)
380
 
{
381
 
  if (session->scheduler.init(session))
382
 
  {
383
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Scheduler init error in libevent_add_new_connection\n"));
384
 
    pthread_mutex_unlock(&LOCK_thread_count);
385
 
    libevent_connection_close(session);
386
 
    return;
387
 
  }
388
 
  threads.append(session);
389
 
  libevent_session_add(session);
390
 
 
391
 
  pthread_mutex_unlock(&LOCK_thread_count);
392
 
  return;
393
 
}
394
 
 
395
 
 
396
 
/**
397
 
  @brief Signal a waiting connection it's time to die.
398
 
 
399
 
  @details This function will signal libevent the Session should be killed.
400
 
    Either the global LOCK_session_count or the Session's LOCK_delete must be locked
401
 
    upon entry.
402
 
 
403
 
  @param[in]  session The connection to kill
404
 
*/
405
 
 
406
 
static void libevent_post_kill_notification(Session *)
407
 
{
408
 
  /*
409
 
    Note, we just wake up libevent with an event that a Session should be killed,
410
 
    It will search its list of sessions for session->killed ==  KILL_CONNECTION to
411
 
    find the Sessions it should kill.
412
 
 
413
 
    So we don't actually tell it which one and we don't actually use the
414
 
    Session being passed to us, but that's just a design detail that could change
415
 
    later.
416
 
  */
417
 
  char c= 0;
418
 
  assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
419
 
}
420
 
 
421
 
 
422
 
/*
423
 
  Close and delete a connection.
424
 
*/
425
 
 
426
 
static void libevent_connection_close(Session *session)
427
 
{
428
 
  session->killed= Session::KILL_CONNECTION;          // Avoid error messages
429
 
 
430
 
  if (net_get_sd(&(session->net)) >= 0)                  // not already closed
431
 
  {
432
 
    end_connection(session);
433
 
    session->close_connection(0, 1);
434
 
  }
435
 
  session->scheduler.thread_detach();
436
 
  unlink_session(session);   /* locks LOCK_thread_count and deletes session */
437
 
  pthread_mutex_unlock(&LOCK_thread_count);
438
 
 
439
 
  return;
440
 
}
441
 
 
442
 
 
443
 
/*
444
 
  Returns true if we should close and delete a Session connection.
445
 
*/
446
 
 
447
 
static bool libevent_should_close_connection(Session* session)
448
 
{
449
 
  return net_should_close(&(session->net)) ||
450
 
         session->killed == Session::KILL_CONNECTION;
451
 
}
452
 
 
453
 
 
454
 
/*
455
 
  libevent_thread_proc is the outer loop of each thread in the thread pool.
456
 
  These procs only return/terminate on shutdown (kill_pool_threads == true).
457
 
*/
458
 
 
459
 
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
460
 
{
461
 
  if (init_new_connection_handler_thread())
462
 
  {
463
 
    my_thread_global_end();
464
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("libevent_thread_proc: my_thread_init() failed\n"));
465
 
    exit(1);
466
 
  }
467
 
 
468
 
  /*
469
 
    Signal libevent_init() when all threads has been created and are ready to
470
 
    receive events.
471
 
  */
472
 
  (void) pthread_mutex_lock(&LOCK_thread_count);
473
 
  created_threads++;
474
 
  if (created_threads == thread_pool_size)
475
 
    (void) pthread_cond_signal(&COND_thread_count);
476
 
  (void) pthread_mutex_unlock(&LOCK_thread_count);
477
 
 
478
 
  for (;;)
479
 
  {
480
 
    Session *session= NULL;
481
 
    (void) pthread_mutex_lock(&LOCK_event_loop);
482
 
 
483
 
    /* get session(s) to process */
484
 
    while (!sessions_need_processing)
485
 
    {
486
 
      if (kill_pool_threads)
487
 
      {
488
 
        /* the flag that we should die has been set */
489
 
        (void) pthread_mutex_unlock(&LOCK_event_loop);
490
 
        goto thread_exit;
491
 
      }
492
 
      event_loop(EVLOOP_ONCE);
493
 
    }
494
 
 
495
 
    /* pop the first session off the list */
496
 
    session= (Session*)sessions_need_processing->data;
497
 
    sessions_need_processing= list_delete(sessions_need_processing,
498
 
                                      sessions_need_processing);
499
 
 
500
 
    (void) pthread_mutex_unlock(&LOCK_event_loop);
501
 
 
502
 
    /* now we process the connection (session) */
503
 
 
504
 
    /* set up the session<->thread links. */
505
 
    session->thread_stack= (char*) &session;
506
 
 
507
 
    if (session->scheduler.thread_attach())
508
 
    {
509
 
      libevent_connection_close(session);
510
 
      continue;
511
 
    }
512
 
 
513
 
    /* is the connection logged in yet? */
514
 
    if (!session->scheduler.logged_in)
515
 
    {
516
 
      if (login_connection(session))
517
 
      {
518
 
        /* Failed to log in */
519
 
        libevent_connection_close(session);
520
 
        continue;
521
 
      }
522
 
      else
523
 
      {
524
 
        /* login successful */
525
 
        session->scheduler.logged_in= true;
526
 
        prepare_new_connection_state(session);
527
 
        if (!libevent_needs_immediate_processing(session))
528
 
          continue; /* New connection is now waiting for data in libevent*/
529
 
      }
530
 
    }
531
 
 
532
 
    do
533
 
    {
534
 
      /* Process a query */
535
 
      if (do_command(session))
536
 
      {
537
 
        libevent_connection_close(session);
538
 
        break;
539
 
      }
540
 
    } while (libevent_needs_immediate_processing(session));
541
 
  }
542
 
 
543
 
thread_exit:
544
 
  (void) pthread_mutex_lock(&LOCK_thread_count);
545
 
  killed_threads++;
546
 
  pthread_cond_broadcast(&COND_thread_count);
547
 
  (void) pthread_mutex_unlock(&LOCK_thread_count);
548
 
  my_thread_end();
549
 
  pthread_exit(0);
550
 
  return(0);                               /* purify: deadcode */
551
 
}
552
 
 
553
 
 
554
 
/*
555
 
  Returns true if the connection needs immediate processing and false if
556
 
  instead it's queued for libevent processing or closed,
557
 
*/
558
 
 
559
 
static bool libevent_needs_immediate_processing(Session *session)
560
 
{
561
 
  if (libevent_should_close_connection(session))
562
 
  {
563
 
    libevent_connection_close(session);
564
 
    return false;
565
 
  }
566
 
  /*
567
 
    If more data in the socket buffer, return true to process another command.
568
 
 
569
 
    Note: we cannot add for event processing because the whole request might
570
 
    already be buffered and we wouldn't receive an event.
571
 
  */
572
 
  if (net_more_data(&(session->net)))
573
 
    return true;
574
 
 
575
 
  session->scheduler.thread_detach();
576
 
  libevent_session_add(session);
577
 
  return false;
578
 
}
579
 
 
580
 
 
581
 
/*
582
 
  Adds a Session to queued for libevent processing.
583
 
 
584
 
  This call does not actually register the event with libevent.
585
 
  Instead, it places the Session onto a queue and signals libevent by writing
586
 
  a byte into session_add_pipe, which will cause our libevent_add_session_callback to
587
 
  be invoked which will find the Session on the queue and add it to libevent.
588
 
*/
589
 
 
590
 
static void libevent_session_add(Session* session)
591
 
{
592
 
  char c=0;
593
 
  pthread_mutex_lock(&LOCK_session_add);
594
 
  /* queue for libevent */
595
 
  sessions_need_adding= list_add(sessions_need_adding, &session->scheduler.list);
596
 
  /* notify libevent */
597
 
  assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
598
 
  pthread_mutex_unlock(&LOCK_session_add);
599
 
}
600
 
 
601
 
 
602
 
/**
603
 
  Wait until all pool threads have been deleted for clean shutdown
604
 
*/
605
 
 
606
 
static void libevent_end()
607
 
{
608
 
  (void) pthread_mutex_lock(&LOCK_thread_count);
609
 
 
610
 
  kill_pool_threads= true;
611
 
  while (killed_threads != created_threads)
612
 
  {
613
 
    /* wake up the event loop */
614
 
    char c= 0;
615
 
    assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
616
 
 
617
 
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
618
 
  }
619
 
  (void) pthread_mutex_unlock(&LOCK_thread_count);
620
 
 
621
 
  event_del(&session_add_event);
622
 
  close(session_add_pipe[0]);
623
 
  close(session_add_pipe[1]);
624
 
  event_del(&session_kill_event);
625
 
  close(session_kill_pipe[0]);
626
 
  close(session_kill_pipe[1]);
627
 
 
628
 
  (void) pthread_mutex_destroy(&LOCK_event_loop);
629
 
  (void) pthread_mutex_destroy(&LOCK_session_add);
630
 
  return;
631
 
}
632
 
 
633
 
 
634
 
void pool_of_threads_scheduler(scheduler_functions* func)
635
 
{
636
 
  func->max_threads= thread_pool_size;
637
 
  func->init= libevent_init;
638
 
  func->end=  libevent_end;
639
 
  func->post_kill_notification= libevent_post_kill_notification;
640
 
  func->add_connection= libevent_add_connection;
641
 
}