~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/scheduler.cc

  • Committer: Brian Aker
  • Date: 2008-10-29 13:46:43 UTC
  • Revision ID: brian@tangent.org-20081029134643-z6jcwjvyruhk2vlu
Updates for ignore file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2007 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/*
 
17
  Implementation for the thread scheduler
 
18
*/
 
19
 
 
20
#include <drizzled/server_includes.h>
 
21
#include <libdrizzle/libdrizzle.h>
 
22
#include "event.h"
 
23
#include <drizzled/gettext.h>
 
24
 
 
25
 
 
26
/*
 
27
  'Dummy' functions to be used when we don't need any handling for a scheduler
 
28
  event
 
29
 */
 
30
 
 
31
static bool init_dummy(void) {return 0;}
 
32
static void post_kill_dummy(Session *session __attribute__((unused))) {}
 
33
static void end_dummy(void) {}
 
34
static bool end_thread_dummy(Session *session __attribute__((unused)),
 
35
                             bool cache_thread __attribute__((unused)))
 
36
{ return 0; }
 
37
 
 
38
/*
 
39
  Initialize default scheduler with dummy functions so that setup functions
 
40
  only need to declare those that are relvant for their usage
 
41
*/
 
42
 
 
43
scheduler_functions::scheduler_functions()
 
44
  :init(init_dummy),
 
45
   init_new_connection_thread(init_new_connection_handler_thread),
 
46
   add_connection(0),                           // Must be defined
 
47
   post_kill_notification(post_kill_dummy),
 
48
   end_thread(end_thread_dummy), end(end_dummy)
 
49
{}
 
50
 
 
51
static uint32_t created_threads, killed_threads;
 
52
static bool kill_pool_threads;
 
53
 
 
54
static struct event session_add_event;
 
55
static struct event session_kill_event;
 
56
 
 
57
static pthread_mutex_t LOCK_session_add;    /* protects sessions_need_adding */
 
58
static LIST *sessions_need_adding;    /* list of sessions to add to libevent queue */
 
59
 
 
60
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
 
61
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
 
62
 
 
63
/*
 
64
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
 
65
  event_del) and sessions_need_processing and sessions_waiting_for_io.
 
66
*/
 
67
static pthread_mutex_t LOCK_event_loop;
 
68
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
 
69
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
 
70
 
 
71
pthread_handler_t libevent_thread_proc(void *arg);
 
72
static void libevent_end();
 
73
static bool libevent_needs_immediate_processing(Session *session);
 
74
static void libevent_connection_close(Session *session);
 
75
static bool libevent_should_close_connection(Session* session);
 
76
static void libevent_session_add(Session* session);
 
77
void libevent_io_callback(int Fd, short Operation, void *ctx);
 
78
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
 
79
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
 
80
 
 
81
 
 
82
/*
 
83
  Create a pipe and set to non-blocking.
 
84
  Returns true if there is an error.
 
85
*/
 
86
 
 
87
static bool init_pipe(int pipe_fds[])
 
88
{
 
89
  int flags;
 
90
  return pipe(pipe_fds) < 0 ||
 
91
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
 
92
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
 
93
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
 
94
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
 
95
}
 
96
 
 
97
 
 
98
/*
 
99
  session_scheduler keeps the link between Session and events.
 
100
  It's embedded in the Session class.
 
101
*/
 
102
 
 
103
session_scheduler::session_scheduler()
 
104
  : logged_in(false), io_event(NULL), thread_attached(false)
 
105
{  
 
106
}
 
107
 
 
108
 
 
109
session_scheduler::~session_scheduler()
 
110
{
 
111
  free(io_event);
 
112
}
 
113
 
 
114
 
 
115
session_scheduler::session_scheduler(const session_scheduler&)
 
116
  : logged_in(false), io_event(NULL), thread_attached(false)
 
117
{}
 
118
 
 
119
void session_scheduler::operator=(const session_scheduler&)
 
120
{}
 
121
 
 
122
bool session_scheduler::init(Session *parent_session)
 
123
{
 
124
  io_event=
 
125
    (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
 
126
    
 
127
  if (!io_event)
 
128
  {
 
129
    sql_print_error(_("Memory allocation error in session_scheduler::init\n"));
 
130
    return true;
 
131
  }
 
132
  
 
133
  event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ, 
 
134
            libevent_io_callback, (void*)parent_session);
 
135
    
 
136
  list.data= parent_session;
 
137
  
 
138
  return false;
 
139
}
 
140
 
 
141
 
 
142
/*
 
143
  Attach/associate the connection with the OS thread, for command processing.
 
144
*/
 
145
 
 
146
bool session_scheduler::thread_attach()
 
147
{
 
148
  assert(!thread_attached);
 
149
  Session* session = (Session*)list.data;
 
150
  if (libevent_should_close_connection(session) ||
 
151
      setup_connection_thread_globals(session))
 
152
  {
 
153
    return true;
 
154
  }
 
155
  my_errno= 0;
 
156
  session->mysys_var->abort= 0;
 
157
  thread_attached= true;
 
158
 
 
159
  return false;
 
160
}
 
161
 
 
162
 
 
163
/*
 
164
  Detach/disassociate the connection with the OS thread.
 
165
*/
 
166
 
 
167
void session_scheduler::thread_detach()
 
168
{
 
169
  if (thread_attached)
 
170
  {
 
171
    Session* session = (Session*)list.data;
 
172
    session->mysys_var= NULL;
 
173
    thread_attached= false;
 
174
  }
 
175
}
 
176
 
 
177
/**
 
178
  Create all threads for the thread pool
 
179
 
 
180
  NOTES
 
181
    After threads are created we wait until all threads has signaled that
 
182
    they have started before we return
 
183
 
 
184
  RETURN
 
185
    0  ok
 
186
    1  We got an error creating the thread pool
 
187
       In this case we will abort all created threads
 
188
*/
 
189
 
 
190
static bool libevent_init(void)
 
191
{
 
192
  uint32_t i;
 
193
 
 
194
  event_init();
 
195
  
 
196
  created_threads= 0;
 
197
  killed_threads= 0;
 
198
  kill_pool_threads= false;
 
199
  
 
200
  pthread_mutex_init(&LOCK_event_loop, NULL);
 
201
  pthread_mutex_init(&LOCK_session_add, NULL);
 
202
  
 
203
  /* set up the pipe used to add new sessions to the event pool */
 
204
  if (init_pipe(session_add_pipe))
 
205
  {
 
206
    sql_print_error(_("init_pipe(session_add_pipe) error in libevent_init\n"));
 
207
    return(1);
 
208
  }
 
209
  /* set up the pipe used to kill sessions in the event queue */
 
210
  if (init_pipe(session_kill_pipe))
 
211
  {
 
212
    sql_print_error(_("init_pipe(session_kill_pipe) error in libevent_init\n"));
 
213
    close(session_add_pipe[0]);
 
214
    close(session_add_pipe[1]);
 
215
    return(1);
 
216
  }
 
217
  event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
 
218
            libevent_add_session_callback, NULL);
 
219
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
 
220
            libevent_kill_session_callback, NULL);
 
221
 
 
222
 if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
 
223
 {
 
224
   sql_print_error(_("session_add_event event_add error in libevent_init\n"));
 
225
   libevent_end();
 
226
   return(1);
 
227
   
 
228
 }
 
229
  /* Set up the thread pool */
 
230
  created_threads= killed_threads= 0;
 
231
  pthread_mutex_lock(&LOCK_thread_count);
 
232
 
 
233
  for (i= 0; i < thread_pool_size; i++)
 
234
  {
 
235
    pthread_t thread;
 
236
    int error;
 
237
    if ((error= pthread_create(&thread, &connection_attrib,
 
238
                               libevent_thread_proc, 0)))
 
239
    {
 
240
      sql_print_error(_("Can't create completion port thread (error %d)"),
 
241
                      error);
 
242
      pthread_mutex_unlock(&LOCK_thread_count);
 
243
      libevent_end();                      // Cleanup
 
244
      return(true);
 
245
    }
 
246
  }
 
247
 
 
248
  /* Wait until all threads are created */
 
249
  while (created_threads != thread_pool_size)
 
250
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
 
251
  pthread_mutex_unlock(&LOCK_thread_count);
 
252
  
 
253
  return(false);
 
254
}
 
255
 
 
256
 
 
257
/*
 
258
  This is called when data is ready on the socket.
 
259
  
 
260
  NOTES
 
261
    This is only called by the thread that owns LOCK_event_loop.
 
262
  
 
263
    We add the session that got the data to sessions_need_processing, and 
 
264
    cause the libevent event_loop() to terminate. Then this same thread will
 
265
    return from event_loop and pick the session value back up for processing.
 
266
*/
 
267
 
 
268
void libevent_io_callback(int, short, void *ctx)
 
269
{    
 
270
  safe_mutex_assert_owner(&LOCK_event_loop);
 
271
  Session *session= (Session*)ctx;
 
272
  sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
 
273
  sessions_need_processing= list_add(sessions_need_processing, &session->scheduler.list);
 
274
}
 
275
 
 
276
/*
 
277
  This is called when we have a thread we want to be killed.
 
278
  
 
279
  NOTES
 
280
    This is only called by the thread that owns LOCK_event_loop.
 
281
*/
 
282
 
 
283
void libevent_kill_session_callback(int Fd, short, void*)
 
284
{    
 
285
  safe_mutex_assert_owner(&LOCK_event_loop);
 
286
 
 
287
  /* clear the pending events */
 
288
  char c;
 
289
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
290
  {}
 
291
 
 
292
  LIST* list= sessions_waiting_for_io;
 
293
  while (list)
 
294
  {
 
295
    Session *session= (Session*)list->data;
 
296
    list= list_rest(list);
 
297
    if (session->killed == Session::KILL_CONNECTION)
 
298
    {
 
299
      /*
 
300
        Delete from libevent and add to the processing queue.
 
301
      */
 
302
      event_del(session->scheduler.io_event);
 
303
      sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
 
304
                                       &session->scheduler.list);
 
305
      sessions_need_processing= list_add(sessions_need_processing,
 
306
                                     &session->scheduler.list);
 
307
    }
 
308
  }
 
309
}
 
310
 
 
311
 
 
312
/*
 
313
  This is used to add connections to the pool. This callback is invoked from
 
314
  the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
 
315
  written to it.
 
316
  
 
317
  NOTES
 
318
    This is only called by the thread that owns LOCK_event_loop.
 
319
*/
 
320
 
 
321
void libevent_add_session_callback(int Fd, short, void *)
 
322
 
323
  safe_mutex_assert_owner(&LOCK_event_loop);
 
324
 
 
325
  /* clear the pending events */
 
326
  char c;
 
327
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
328
  {}
 
329
 
 
330
  pthread_mutex_lock(&LOCK_session_add);
 
331
  while (sessions_need_adding)
 
332
  {
 
333
    /* pop the first session off the list */
 
334
    Session* session= (Session*)sessions_need_adding->data;
 
335
    sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
 
336
 
 
337
    pthread_mutex_unlock(&LOCK_session_add);
 
338
    
 
339
    if (!session->scheduler.logged_in || libevent_should_close_connection(session))
 
340
    {
 
341
      /*
 
342
        Add session to sessions_need_processing list. If it needs closing we'll close
 
343
        it outside of event_loop().
 
344
      */
 
345
      sessions_need_processing= list_add(sessions_need_processing,
 
346
                                     &session->scheduler.list);
 
347
    }
 
348
    else
 
349
    {
 
350
      /* Add to libevent */
 
351
      if (event_add(session->scheduler.io_event, NULL))
 
352
      {
 
353
        sql_print_error(_("event_add error in libevent_add_session_callback\n"));
 
354
        libevent_connection_close(session);
 
355
      } 
 
356
      else
 
357
      {
 
358
        sessions_waiting_for_io= list_add(sessions_waiting_for_io,
 
359
                                      &session->scheduler.list);
 
360
      }
 
361
    }
 
362
    pthread_mutex_lock(&LOCK_session_add);
 
363
  }
 
364
  pthread_mutex_unlock(&LOCK_session_add);
 
365
}
 
366
 
 
367
 
 
368
/**
 
369
  Notify the thread pool about a new connection
 
370
 
 
371
  NOTES
 
372
    LOCK_thread_count is locked on entry. This function MUST unlock it!
 
373
*/
 
374
 
 
375
static void libevent_add_connection(Session *session)
 
376
{
 
377
  if (session->scheduler.init(session))
 
378
  {
 
379
    sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
 
380
    pthread_mutex_unlock(&LOCK_thread_count);
 
381
    libevent_connection_close(session);
 
382
    return;
 
383
  }
 
384
  threads.append(session);
 
385
  libevent_session_add(session);
 
386
  
 
387
  pthread_mutex_unlock(&LOCK_thread_count);
 
388
  return;
 
389
}
 
390
 
 
391
 
 
392
/**
 
393
  @brief Signal a waiting connection it's time to die.
 
394
 
 
395
  @details This function will signal libevent the Session should be killed.
 
396
    Either the global LOCK_session_count or the Session's LOCK_delete must be locked
 
397
    upon entry.
 
398
 
 
399
  @param[in]  session The connection to kill
 
400
*/
 
401
 
 
402
static void libevent_post_kill_notification(Session *)
 
403
{
 
404
  /*
 
405
    Note, we just wake up libevent with an event that a Session should be killed,
 
406
    It will search its list of sessions for session->killed ==  KILL_CONNECTION to
 
407
    find the Sessions it should kill.
 
408
    
 
409
    So we don't actually tell it which one and we don't actually use the
 
410
    Session being passed to us, but that's just a design detail that could change
 
411
    later.
 
412
  */
 
413
  char c= 0;
 
414
  assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
 
415
}
 
416
 
 
417
 
 
418
/*
 
419
  Close and delete a connection.
 
420
*/
 
421
 
 
422
static void libevent_connection_close(Session *session)
 
423
{
 
424
  session->killed= Session::KILL_CONNECTION;          // Avoid error messages
 
425
 
 
426
  if (net_get_sd(&(session->net)) >= 0)                  // not already closed
 
427
  {
 
428
    end_connection(session);
 
429
    close_connection(session, 0, 1);
 
430
  }
 
431
  session->scheduler.thread_detach();
 
432
  unlink_session(session);   /* locks LOCK_thread_count and deletes session */
 
433
  pthread_mutex_unlock(&LOCK_thread_count);
 
434
 
 
435
  return;
 
436
}
 
437
 
 
438
 
 
439
/*
 
440
  Returns true if we should close and delete a Session connection.
 
441
*/
 
442
 
 
443
static bool libevent_should_close_connection(Session* session)
 
444
{
 
445
  return net_should_close(&(session->net)) ||
 
446
         session->killed == Session::KILL_CONNECTION;
 
447
}
 
448
 
 
449
 
 
450
/*
 
451
  libevent_thread_proc is the outer loop of each thread in the thread pool.
 
452
  These procs only return/terminate on shutdown (kill_pool_threads == true).
 
453
*/
 
454
 
 
455
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
 
456
{
 
457
  if (init_new_connection_handler_thread())
 
458
  {
 
459
    my_thread_global_end();
 
460
    sql_print_error(_("libevent_thread_proc: my_thread_init() failed\n"));
 
461
    exit(1);
 
462
  }
 
463
 
 
464
  /*
 
465
    Signal libevent_init() when all threads has been created and are ready to
 
466
    receive events.
 
467
  */
 
468
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
469
  created_threads++;
 
470
  if (created_threads == thread_pool_size)
 
471
    (void) pthread_cond_signal(&COND_thread_count);
 
472
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
473
  
 
474
  for (;;)
 
475
  {
 
476
    Session *session= NULL;
 
477
    (void) pthread_mutex_lock(&LOCK_event_loop);
 
478
    
 
479
    /* get session(s) to process */
 
480
    while (!sessions_need_processing)
 
481
    {
 
482
      if (kill_pool_threads)
 
483
      {
 
484
        /* the flag that we should die has been set */
 
485
        (void) pthread_mutex_unlock(&LOCK_event_loop);
 
486
        goto thread_exit;
 
487
      }
 
488
      event_loop(EVLOOP_ONCE);
 
489
    }
 
490
    
 
491
    /* pop the first session off the list */
 
492
    session= (Session*)sessions_need_processing->data;
 
493
    sessions_need_processing= list_delete(sessions_need_processing,
 
494
                                      sessions_need_processing);
 
495
    
 
496
    (void) pthread_mutex_unlock(&LOCK_event_loop);
 
497
    
 
498
    /* now we process the connection (session) */
 
499
    
 
500
    /* set up the session<->thread links. */
 
501
    session->thread_stack= (char*) &session;
 
502
    
 
503
    if (session->scheduler.thread_attach())
 
504
    {
 
505
      libevent_connection_close(session);
 
506
      continue;
 
507
    }
 
508
 
 
509
    /* is the connection logged in yet? */
 
510
    if (!session->scheduler.logged_in)
 
511
    {
 
512
      if (login_connection(session))
 
513
      {
 
514
        /* Failed to log in */
 
515
        libevent_connection_close(session);
 
516
        continue;
 
517
      }
 
518
      else
 
519
      {
 
520
        /* login successful */
 
521
        session->scheduler.logged_in= true;
 
522
        prepare_new_connection_state(session);
 
523
        if (!libevent_needs_immediate_processing(session))
 
524
          continue; /* New connection is now waiting for data in libevent*/
 
525
      }
 
526
    }
 
527
 
 
528
    do
 
529
    {
 
530
      /* Process a query */
 
531
      if (do_command(session))
 
532
      {
 
533
        libevent_connection_close(session);
 
534
        break;
 
535
      }
 
536
    } while (libevent_needs_immediate_processing(session));
 
537
  }
 
538
  
 
539
thread_exit:
 
540
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
541
  killed_threads++;
 
542
  pthread_cond_broadcast(&COND_thread_count);
 
543
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
544
  my_thread_end();
 
545
  pthread_exit(0);
 
546
  return(0);                               /* purify: deadcode */
 
547
}
 
548
 
 
549
 
 
550
/*
 
551
  Returns true if the connection needs immediate processing and false if 
 
552
  instead it's queued for libevent processing or closed,
 
553
*/
 
554
 
 
555
static bool libevent_needs_immediate_processing(Session *session)
 
556
{
 
557
  if (libevent_should_close_connection(session))
 
558
  {
 
559
    libevent_connection_close(session);
 
560
    return false;
 
561
  }
 
562
  /*
 
563
    If more data in the socket buffer, return true to process another command.
 
564
 
 
565
    Note: we cannot add for event processing because the whole request might
 
566
    already be buffered and we wouldn't receive an event.
 
567
  */
 
568
  if (net_more_data(&(session->net)))
 
569
    return true;
 
570
  
 
571
  session->scheduler.thread_detach();
 
572
  libevent_session_add(session);
 
573
  return false;
 
574
}
 
575
 
 
576
 
 
577
/*
 
578
  Adds a Session to queued for libevent processing.
 
579
  
 
580
  This call does not actually register the event with libevent.
 
581
  Instead, it places the Session onto a queue and signals libevent by writing
 
582
  a byte into session_add_pipe, which will cause our libevent_add_session_callback to
 
583
  be invoked which will find the Session on the queue and add it to libevent.
 
584
*/
 
585
 
 
586
static void libevent_session_add(Session* session)
 
587
{
 
588
  char c=0;
 
589
  pthread_mutex_lock(&LOCK_session_add);
 
590
  /* queue for libevent */
 
591
  sessions_need_adding= list_add(sessions_need_adding, &session->scheduler.list);
 
592
  /* notify libevent */
 
593
  assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
 
594
  pthread_mutex_unlock(&LOCK_session_add);
 
595
}
 
596
 
 
597
 
 
598
/**
 
599
  Wait until all pool threads have been deleted for clean shutdown
 
600
*/
 
601
 
 
602
static void libevent_end()
 
603
{
 
604
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
605
  
 
606
  kill_pool_threads= true;
 
607
  while (killed_threads != created_threads)
 
608
  {
 
609
    /* wake up the event loop */
 
610
    char c= 0;
 
611
    assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
 
612
 
 
613
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
 
614
  }
 
615
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
616
  
 
617
  event_del(&session_add_event);
 
618
  close(session_add_pipe[0]);
 
619
  close(session_add_pipe[1]);
 
620
  event_del(&session_kill_event);
 
621
  close(session_kill_pipe[0]);
 
622
  close(session_kill_pipe[1]);
 
623
 
 
624
  (void) pthread_mutex_destroy(&LOCK_event_loop);
 
625
  (void) pthread_mutex_destroy(&LOCK_session_add);
 
626
  return;
 
627
}
 
628
 
 
629
 
 
630
void pool_of_threads_scheduler(scheduler_functions* func)
 
631
{
 
632
  func->max_threads= thread_pool_size;
 
633
  func->init= libevent_init;
 
634
  func->end=  libevent_end;
 
635
  func->post_kill_notification= libevent_post_kill_notification;
 
636
  func->add_connection= libevent_add_connection;
 
637
}