~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/scheduler.cc

Merged vcol stuff.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2007 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/*
 
17
  Implementation for the thread scheduler
 
18
*/
 
19
 
 
20
#include <drizzled/server_includes.h>
 
21
#include <libdrizzle/libdrizzle.h>
 
22
#include "event.h"
 
23
 
 
24
 
 
25
/*
 
26
  'Dummy' functions to be used when we don't need any handling for a scheduler
 
27
  event
 
28
 */
 
29
 
 
30
static bool init_dummy(void) {return 0;}
 
31
static void post_kill_dummy(THD *thd __attribute__((unused))) {}
 
32
static void end_dummy(void) {}
 
33
static bool end_thread_dummy(THD *thd __attribute__((unused)),
 
34
                             bool cache_thread __attribute__((unused)))
 
35
{ return 0; }
 
36
 
 
37
/*
 
38
  Initialize default scheduler with dummy functions so that setup functions
 
39
  only need to declare those that are relvant for their usage
 
40
*/
 
41
 
 
42
scheduler_functions::scheduler_functions()
 
43
  :init(init_dummy),
 
44
   init_new_connection_thread(init_new_connection_handler_thread),
 
45
   add_connection(0),                           // Must be defined
 
46
   post_kill_notification(post_kill_dummy),
 
47
   end_thread(end_thread_dummy), end(end_dummy)
 
48
{}
 
49
 
 
50
static uint32_t created_threads, killed_threads;
 
51
static bool kill_pool_threads;
 
52
 
 
53
static struct event thd_add_event;
 
54
static struct event thd_kill_event;
 
55
 
 
56
static pthread_mutex_t LOCK_thd_add;    /* protects thds_need_adding */
 
57
static LIST *thds_need_adding;    /* list of thds to add to libevent queue */
 
58
 
 
59
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
 
60
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
 
61
 
 
62
/*
 
63
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
 
64
  event_del) and thds_need_processing and thds_waiting_for_io.
 
65
*/
 
66
static pthread_mutex_t LOCK_event_loop;
 
67
static LIST *thds_need_processing; /* list of thds that needs some processing */
 
68
static LIST *thds_waiting_for_io; /* list of thds with added events */
 
69
 
 
70
pthread_handler_t libevent_thread_proc(void *arg);
 
71
static void libevent_end();
 
72
static bool libevent_needs_immediate_processing(THD *thd);
 
73
static void libevent_connection_close(THD *thd);
 
74
static bool libevent_should_close_connection(THD* thd);
 
75
static void libevent_thd_add(THD* thd);
 
76
void libevent_io_callback(int Fd, short Operation, void *ctx);
 
77
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
 
78
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
 
79
 
 
80
 
 
81
/*
 
82
  Create a pipe and set to non-blocking.
 
83
  Returns true if there is an error.
 
84
*/
 
85
 
 
86
static bool init_pipe(int pipe_fds[])
 
87
{
 
88
  int flags;
 
89
  return pipe(pipe_fds) < 0 ||
 
90
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
 
91
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
 
92
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
 
93
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
 
94
}
 
95
 
 
96
 
 
97
/*
 
98
  thd_scheduler keeps the link between THD and events.
 
99
  It's embedded in the THD class.
 
100
*/
 
101
 
 
102
thd_scheduler::thd_scheduler()
 
103
  : logged_in(false), io_event(NULL), thread_attached(false)
 
104
{  
 
105
  dbug_explain_buf[0]= 0;
 
106
}
 
107
 
 
108
 
 
109
thd_scheduler::~thd_scheduler()
 
110
{
 
111
  free(io_event);
 
112
}
 
113
 
 
114
 
 
115
bool thd_scheduler::init(THD *parent_thd)
 
116
{
 
117
  io_event=
 
118
    (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
 
119
    
 
120
  if (!io_event)
 
121
  {
 
122
    sql_print_error(_("Memory allocation error in thd_scheduler::init\n"));
 
123
    return true;
 
124
  }
 
125
  
 
126
  event_set(io_event, net_get_sd(&(parent_thd->net)), EV_READ, 
 
127
            libevent_io_callback, (void*)parent_thd);
 
128
    
 
129
  list.data= parent_thd;
 
130
  
 
131
  return false;
 
132
}
 
133
 
 
134
 
 
135
/*
 
136
  Attach/associate the connection with the OS thread, for command processing.
 
137
*/
 
138
 
 
139
bool thd_scheduler::thread_attach()
 
140
{
 
141
  assert(!thread_attached);
 
142
  THD* thd = (THD*)list.data;
 
143
  if (libevent_should_close_connection(thd) ||
 
144
      setup_connection_thread_globals(thd))
 
145
  {
 
146
    return true;
 
147
  }
 
148
  my_errno= 0;
 
149
  thd->mysys_var->abort= 0;
 
150
  thread_attached= true;
 
151
  swap_dbug_explain();
 
152
  return false;
 
153
}
 
154
 
 
155
 
 
156
/*
 
157
  Detach/disassociate the connection with the OS thread.
 
158
*/
 
159
 
 
160
void thd_scheduler::thread_detach()
 
161
{
 
162
  if (thread_attached)
 
163
  {
 
164
    THD* thd = (THD*)list.data;
 
165
    thd->mysys_var= NULL;
 
166
    thread_attached= false;
 
167
    swap_dbug_explain();
 
168
  }
 
169
}
 
170
 
 
171
 
 
172
/*
 
173
  Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
 
174
 
 
175
  This is used to preserve the SESSION DEBUG variable, which is mapped to the OS 
 
176
  thread during a command, but each command is handled by a different thread.
 
177
*/
 
178
void thd_scheduler::swap_dbug_explain()
 
179
{
 
180
  char buffer[sizeof(dbug_explain_buf)];
 
181
  memcpy(dbug_explain_buf, buffer, sizeof(buffer));
 
182
}
 
183
 
 
184
/**
 
185
  Create all threads for the thread pool
 
186
 
 
187
  NOTES
 
188
    After threads are created we wait until all threads has signaled that
 
189
    they have started before we return
 
190
 
 
191
  RETURN
 
192
    0  ok
 
193
    1  We got an error creating the thread pool
 
194
       In this case we will abort all created threads
 
195
*/
 
196
 
 
197
static bool libevent_init(void)
 
198
{
 
199
  uint32_t i;
 
200
 
 
201
  event_init();
 
202
  
 
203
  created_threads= 0;
 
204
  killed_threads= 0;
 
205
  kill_pool_threads= false;
 
206
  
 
207
  pthread_mutex_init(&LOCK_event_loop, NULL);
 
208
  pthread_mutex_init(&LOCK_thd_add, NULL);
 
209
  
 
210
  /* set up the pipe used to add new thds to the event pool */
 
211
  if (init_pipe(thd_add_pipe))
 
212
  {
 
213
    sql_print_error(_("init_pipe(thd_add_pipe) error in libevent_init\n"));
 
214
    return(1);
 
215
  }
 
216
  /* set up the pipe used to kill thds in the event queue */
 
217
  if (init_pipe(thd_kill_pipe))
 
218
  {
 
219
    sql_print_error(_("init_pipe(thd_kill_pipe) error in libevent_init\n"));
 
220
    close(thd_add_pipe[0]);
 
221
    close(thd_add_pipe[1]);
 
222
    return(1);
 
223
  }
 
224
  event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
 
225
            libevent_add_thd_callback, NULL);
 
226
  event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
 
227
            libevent_kill_thd_callback, NULL);
 
228
 
 
229
 if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
 
230
 {
 
231
   sql_print_error(_("thd_add_event event_add error in libevent_init\n"));
 
232
   libevent_end();
 
233
   return(1);
 
234
   
 
235
 }
 
236
  /* Set up the thread pool */
 
237
  created_threads= killed_threads= 0;
 
238
  pthread_mutex_lock(&LOCK_thread_count);
 
239
 
 
240
  for (i= 0; i < thread_pool_size; i++)
 
241
  {
 
242
    pthread_t thread;
 
243
    int error;
 
244
    if ((error= pthread_create(&thread, &connection_attrib,
 
245
                               libevent_thread_proc, 0)))
 
246
    {
 
247
      sql_print_error(_("Can't create completion port thread (error %d)"),
 
248
                      error);
 
249
      pthread_mutex_unlock(&LOCK_thread_count);
 
250
      libevent_end();                      // Cleanup
 
251
      return(true);
 
252
    }
 
253
  }
 
254
 
 
255
  /* Wait until all threads are created */
 
256
  while (created_threads != thread_pool_size)
 
257
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
 
258
  pthread_mutex_unlock(&LOCK_thread_count);
 
259
  
 
260
  return(false);
 
261
}
 
262
 
 
263
 
 
264
/*
 
265
  This is called when data is ready on the socket.
 
266
  
 
267
  NOTES
 
268
    This is only called by the thread that owns LOCK_event_loop.
 
269
  
 
270
    We add the thd that got the data to thds_need_processing, and 
 
271
    cause the libevent event_loop() to terminate. Then this same thread will
 
272
    return from event_loop and pick the thd value back up for processing.
 
273
*/
 
274
 
 
275
void libevent_io_callback(int, short, void *ctx)
 
276
{    
 
277
  safe_mutex_assert_owner(&LOCK_event_loop);
 
278
  THD *thd= (THD*)ctx;
 
279
  thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
 
280
  thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
 
281
}
 
282
 
 
283
/*
 
284
  This is called when we have a thread we want to be killed.
 
285
  
 
286
  NOTES
 
287
    This is only called by the thread that owns LOCK_event_loop.
 
288
*/
 
289
 
 
290
void libevent_kill_thd_callback(int Fd, short, void*)
 
291
{    
 
292
  safe_mutex_assert_owner(&LOCK_event_loop);
 
293
 
 
294
  /* clear the pending events */
 
295
  char c;
 
296
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
297
  {}
 
298
 
 
299
  LIST* list= thds_waiting_for_io;
 
300
  while (list)
 
301
  {
 
302
    THD *thd= (THD*)list->data;
 
303
    list= list_rest(list);
 
304
    if (thd->killed == THD::KILL_CONNECTION)
 
305
    {
 
306
      /*
 
307
        Delete from libevent and add to the processing queue.
 
308
      */
 
309
      event_del(thd->scheduler.io_event);
 
310
      thds_waiting_for_io= list_delete(thds_waiting_for_io,
 
311
                                       &thd->scheduler.list);
 
312
      thds_need_processing= list_add(thds_need_processing,
 
313
                                     &thd->scheduler.list);
 
314
    }
 
315
  }
 
316
}
 
317
 
 
318
 
 
319
/*
 
320
  This is used to add connections to the pool. This callback is invoked from
 
321
  the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
 
322
  written to it.
 
323
  
 
324
  NOTES
 
325
    This is only called by the thread that owns LOCK_event_loop.
 
326
*/
 
327
 
 
328
void libevent_add_thd_callback(int Fd, short, void *)
 
329
 
330
  safe_mutex_assert_owner(&LOCK_event_loop);
 
331
 
 
332
  /* clear the pending events */
 
333
  char c;
 
334
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
335
  {}
 
336
 
 
337
  pthread_mutex_lock(&LOCK_thd_add);
 
338
  while (thds_need_adding)
 
339
  {
 
340
    /* pop the first thd off the list */
 
341
    THD* thd= (THD*)thds_need_adding->data;
 
342
    thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
 
343
 
 
344
    pthread_mutex_unlock(&LOCK_thd_add);
 
345
    
 
346
    if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
 
347
    {
 
348
      /*
 
349
        Add thd to thds_need_processing list. If it needs closing we'll close
 
350
        it outside of event_loop().
 
351
      */
 
352
      thds_need_processing= list_add(thds_need_processing,
 
353
                                     &thd->scheduler.list);
 
354
    }
 
355
    else
 
356
    {
 
357
      /* Add to libevent */
 
358
      if (event_add(thd->scheduler.io_event, NULL))
 
359
      {
 
360
        sql_print_error(_("event_add error in libevent_add_thd_callback\n"));
 
361
        libevent_connection_close(thd);
 
362
      } 
 
363
      else
 
364
      {
 
365
        thds_waiting_for_io= list_add(thds_waiting_for_io,
 
366
                                      &thd->scheduler.list);
 
367
      }
 
368
    }
 
369
    pthread_mutex_lock(&LOCK_thd_add);
 
370
  }
 
371
  pthread_mutex_unlock(&LOCK_thd_add);
 
372
}
 
373
 
 
374
 
 
375
/**
 
376
  Notify the thread pool about a new connection
 
377
 
 
378
  NOTES
 
379
    LOCK_thread_count is locked on entry. This function MUST unlock it!
 
380
*/
 
381
 
 
382
static void libevent_add_connection(THD *thd)
 
383
{
 
384
  if (thd->scheduler.init(thd))
 
385
  {
 
386
    sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
 
387
    pthread_mutex_unlock(&LOCK_thread_count);
 
388
    libevent_connection_close(thd);
 
389
    return;
 
390
  }
 
391
  threads.append(thd);
 
392
  libevent_thd_add(thd);
 
393
  
 
394
  pthread_mutex_unlock(&LOCK_thread_count);
 
395
  return;
 
396
}
 
397
 
 
398
 
 
399
/**
 
400
  @brief Signal a waiting connection it's time to die.
 
401
 
 
402
  @details This function will signal libevent the THD should be killed.
 
403
    Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
 
404
    upon entry.
 
405
 
 
406
  @param[in]  thd The connection to kill
 
407
*/
 
408
 
 
409
static void libevent_post_kill_notification(THD *)
 
410
{
 
411
  /*
 
412
    Note, we just wake up libevent with an event that a THD should be killed,
 
413
    It will search its list of thds for thd->killed ==  KILL_CONNECTION to
 
414
    find the THDs it should kill.
 
415
    
 
416
    So we don't actually tell it which one and we don't actually use the
 
417
    THD being passed to us, but that's just a design detail that could change
 
418
    later.
 
419
  */
 
420
  char c= 0;
 
421
  write(thd_kill_pipe[1], &c, sizeof(c));
 
422
}
 
423
 
 
424
 
 
425
/*
 
426
  Close and delete a connection.
 
427
*/
 
428
 
 
429
static void libevent_connection_close(THD *thd)
 
430
{
 
431
  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
 
432
 
 
433
  if (net_get_sd(&(thd->net)) >= 0)                  // not already closed
 
434
  {
 
435
    end_connection(thd);
 
436
    close_connection(thd, 0, 1);
 
437
  }
 
438
  thd->scheduler.thread_detach();
 
439
  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
 
440
  pthread_mutex_unlock(&LOCK_thread_count);
 
441
 
 
442
  return;
 
443
}
 
444
 
 
445
 
 
446
/*
 
447
  Returns true if we should close and delete a THD connection.
 
448
*/
 
449
 
 
450
static bool libevent_should_close_connection(THD* thd)
 
451
{
 
452
  return net_should_close(&(thd->net)) ||
 
453
         thd->killed == THD::KILL_CONNECTION;
 
454
}
 
455
 
 
456
 
 
457
/*
 
458
  libevent_thread_proc is the outer loop of each thread in the thread pool.
 
459
  These procs only return/terminate on shutdown (kill_pool_threads == true).
 
460
*/
 
461
 
 
462
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
 
463
{
 
464
  if (init_new_connection_handler_thread())
 
465
  {
 
466
    my_thread_global_end();
 
467
    sql_print_error(_("libevent_thread_proc: my_thread_init() failed\n"));
 
468
    exit(1);
 
469
  }
 
470
 
 
471
  /*
 
472
    Signal libevent_init() when all threads has been created and are ready to
 
473
    receive events.
 
474
  */
 
475
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
476
  created_threads++;
 
477
  if (created_threads == thread_pool_size)
 
478
    (void) pthread_cond_signal(&COND_thread_count);
 
479
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
480
  
 
481
  for (;;)
 
482
  {
 
483
    THD *thd= NULL;
 
484
    (void) pthread_mutex_lock(&LOCK_event_loop);
 
485
    
 
486
    /* get thd(s) to process */
 
487
    while (!thds_need_processing)
 
488
    {
 
489
      if (kill_pool_threads)
 
490
      {
 
491
        /* the flag that we should die has been set */
 
492
        (void) pthread_mutex_unlock(&LOCK_event_loop);
 
493
        goto thread_exit;
 
494
      }
 
495
      event_loop(EVLOOP_ONCE);
 
496
    }
 
497
    
 
498
    /* pop the first thd off the list */
 
499
    thd= (THD*)thds_need_processing->data;
 
500
    thds_need_processing= list_delete(thds_need_processing,
 
501
                                      thds_need_processing);
 
502
    
 
503
    (void) pthread_mutex_unlock(&LOCK_event_loop);
 
504
    
 
505
    /* now we process the connection (thd) */
 
506
    
 
507
    /* set up the thd<->thread links. */
 
508
    thd->thread_stack= (char*) &thd;
 
509
    
 
510
    if (thd->scheduler.thread_attach())
 
511
    {
 
512
      libevent_connection_close(thd);
 
513
      continue;
 
514
    }
 
515
 
 
516
    /* is the connection logged in yet? */
 
517
    if (!thd->scheduler.logged_in)
 
518
    {
 
519
      if (login_connection(thd))
 
520
      {
 
521
        /* Failed to log in */
 
522
        libevent_connection_close(thd);
 
523
        continue;
 
524
      }
 
525
      else
 
526
      {
 
527
        /* login successful */
 
528
        thd->scheduler.logged_in= true;
 
529
        prepare_new_connection_state(thd);
 
530
        if (!libevent_needs_immediate_processing(thd))
 
531
          continue; /* New connection is now waiting for data in libevent*/
 
532
      }
 
533
    }
 
534
 
 
535
    do
 
536
    {
 
537
      /* Process a query */
 
538
      if (do_command(thd))
 
539
      {
 
540
        libevent_connection_close(thd);
 
541
        break;
 
542
      }
 
543
    } while (libevent_needs_immediate_processing(thd));
 
544
  }
 
545
  
 
546
thread_exit:
 
547
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
548
  killed_threads++;
 
549
  pthread_cond_broadcast(&COND_thread_count);
 
550
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
551
  my_thread_end();
 
552
  pthread_exit(0);
 
553
  return(0);                               /* purify: deadcode */
 
554
}
 
555
 
 
556
 
 
557
/*
 
558
  Returns true if the connection needs immediate processing and false if 
 
559
  instead it's queued for libevent processing or closed,
 
560
*/
 
561
 
 
562
static bool libevent_needs_immediate_processing(THD *thd)
 
563
{
 
564
  if (libevent_should_close_connection(thd))
 
565
  {
 
566
    libevent_connection_close(thd);
 
567
    return false;
 
568
  }
 
569
  /*
 
570
    If more data in the socket buffer, return true to process another command.
 
571
 
 
572
    Note: we cannot add for event processing because the whole request might
 
573
    already be buffered and we wouldn't receive an event.
 
574
  */
 
575
  if (net_more_data(&(thd->net)))
 
576
    return true;
 
577
  
 
578
  thd->scheduler.thread_detach();
 
579
  libevent_thd_add(thd);
 
580
  return false;
 
581
}
 
582
 
 
583
 
 
584
/*
 
585
  Adds a THD to queued for libevent processing.
 
586
  
 
587
  This call does not actually register the event with libevent.
 
588
  Instead, it places the THD onto a queue and signals libevent by writing
 
589
  a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
 
590
  be invoked which will find the THD on the queue and add it to libevent.
 
591
*/
 
592
 
 
593
static void libevent_thd_add(THD* thd)
 
594
{
 
595
  char c=0;
 
596
  pthread_mutex_lock(&LOCK_thd_add);
 
597
  /* queue for libevent */
 
598
  thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
 
599
  /* notify libevent */
 
600
  write(thd_add_pipe[1], &c, sizeof(c));
 
601
  pthread_mutex_unlock(&LOCK_thd_add);
 
602
}
 
603
 
 
604
 
 
605
/**
 
606
  Wait until all pool threads have been deleted for clean shutdown
 
607
*/
 
608
 
 
609
static void libevent_end()
 
610
{
 
611
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
612
  
 
613
  kill_pool_threads= true;
 
614
  while (killed_threads != created_threads)
 
615
  {
 
616
    /* wake up the event loop */
 
617
    char c= 0;
 
618
    write(thd_add_pipe[1], &c, sizeof(c));
 
619
 
 
620
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
 
621
  }
 
622
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
623
  
 
624
  event_del(&thd_add_event);
 
625
  close(thd_add_pipe[0]);
 
626
  close(thd_add_pipe[1]);
 
627
  event_del(&thd_kill_event);
 
628
  close(thd_kill_pipe[0]);
 
629
  close(thd_kill_pipe[1]);
 
630
 
 
631
  (void) pthread_mutex_destroy(&LOCK_event_loop);
 
632
  (void) pthread_mutex_destroy(&LOCK_thd_add);
 
633
  return;
 
634
}
 
635
 
 
636
 
 
637
void pool_of_threads_scheduler(scheduler_functions* func)
 
638
{
 
639
  func->max_threads= thread_pool_size;
 
640
  func->init= libevent_init;
 
641
  func->end=  libevent_end;
 
642
  func->post_kill_notification= libevent_post_kill_notification;
 
643
  func->add_connection= libevent_add_connection;
 
644
}