~drizzle-trunk/drizzle/development

1 by brian
clean slate
1
/* Copyright (C) 2007 MySQL AB
2
3
   This program is free software; you can redistribute it and/or modify
4
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
6
7
   This program is distributed in the hope that it will be useful,
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
   GNU General Public License for more details.
11
12
   You should have received a copy of the GNU General Public License
13
   along with this program; if not, write to the Free Software
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
16
/*
17
  Implementation for the thread scheduler
18
*/
19
20
#ifdef USE_PRAGMA_INTERFACE
21
#pragma implementation
22
#endif
23
24
#include <mysql_priv.h>
25
#include "event.h"
26
27
28
/*
29
  'Dummy' functions to be used when we don't need any handling for a scheduler
30
  event
31
 */
32
33
static bool init_dummy(void) {return 0;}
34
static void post_kill_dummy(THD *thd) {}  
35
static void end_dummy(void) {}
36
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; }
37
38
/*
39
  Initialize default scheduler with dummy functions so that setup functions
40
  only need to declare those that are relvant for their usage
41
*/
42
43
scheduler_functions::scheduler_functions()
44
  :init(init_dummy),
45
   init_new_connection_thread(init_new_connection_handler_thread),
46
   add_connection(0),                           // Must be defined
47
   post_kill_notification(post_kill_dummy),
48
   end_thread(end_thread_dummy), end(end_dummy)
49
{}
50
51
static uint created_threads, killed_threads;
52
static bool kill_pool_threads;
53
54
static struct event thd_add_event;
55
static struct event thd_kill_event;
56
57
static pthread_mutex_t LOCK_thd_add;    /* protects thds_need_adding */
58
static LIST *thds_need_adding;    /* list of thds to add to libevent queue */
59
60
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
61
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
62
63
/*
64
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
65
  event_del) and thds_need_processing and thds_waiting_for_io.
66
*/
67
static pthread_mutex_t LOCK_event_loop;
68
static LIST *thds_need_processing; /* list of thds that needs some processing */
69
static LIST *thds_waiting_for_io; /* list of thds with added events */
70
71
pthread_handler_t libevent_thread_proc(void *arg);
72
static void libevent_end();
73
static bool libevent_needs_immediate_processing(THD *thd);
74
static void libevent_connection_close(THD *thd);
75
static bool libevent_should_close_connection(THD* thd);
76
static void libevent_thd_add(THD* thd);
77
void libevent_io_callback(int Fd, short Operation, void *ctx);
78
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
79
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
80
81
82
/*
83
  Create a pipe and set to non-blocking.
55 by brian
Update for using real bool types.
84
  Returns true if there is an error.
1 by brian
clean slate
85
*/
86
87
static bool init_pipe(int pipe_fds[])
88
{
89
  int flags;
90
  return pipe(pipe_fds) < 0 ||
91
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
92
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
93
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
94
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
95
}
96
97
98
/*
99
  thd_scheduler keeps the link between THD and events.
100
  It's embedded in the THD class.
101
*/
102
103
thd_scheduler::thd_scheduler()
55 by brian
Update for using real bool types.
104
  : logged_in(false), io_event(NULL), thread_attached(false)
1 by brian
clean slate
105
{  
106
#ifndef DBUG_OFF
107
  dbug_explain_buf[0]= 0;
108
#endif
109
}
110
111
112
thd_scheduler::~thd_scheduler()
113
{
114
  my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
115
}
116
117
118
bool thd_scheduler::init(THD *parent_thd)
119
{
120
  io_event=
121
    (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
122
    
123
  if (!io_event)
124
  {
125
    sql_print_error("Memory allocation error in thd_scheduler::init\n");
55 by brian
Update for using real bool types.
126
    return true;
1 by brian
clean slate
127
  }
128
  
129
  event_set(io_event, parent_thd->net.vio->sd, EV_READ, 
130
            libevent_io_callback, (void*)parent_thd);
131
    
132
  list.data= parent_thd;
133
  
55 by brian
Update for using real bool types.
134
  return false;
1 by brian
clean slate
135
}
136
137
138
/*
139
  Attach/associate the connection with the OS thread, for command processing.
140
*/
141
142
bool thd_scheduler::thread_attach()
143
{
144
  DBUG_ASSERT(!thread_attached);
145
  THD* thd = (THD*)list.data;
146
  if (libevent_should_close_connection(thd) ||
147
      setup_connection_thread_globals(thd))
148
  {
55 by brian
Update for using real bool types.
149
    return true;
1 by brian
clean slate
150
  }
151
  my_errno= 0;
152
  thd->mysys_var->abort= 0;
55 by brian
Update for using real bool types.
153
  thread_attached= true;
1 by brian
clean slate
154
#ifndef DBUG_OFF
155
  swap_dbug_explain();
156
#endif
55 by brian
Update for using real bool types.
157
  return false;
1 by brian
clean slate
158
}
159
160
161
/*
162
  Detach/disassociate the connection with the OS thread.
163
*/
164
165
void thd_scheduler::thread_detach()
166
{
167
  if (thread_attached)
168
  {
169
    THD* thd = (THD*)list.data;
170
    thd->mysys_var= NULL;
55 by brian
Update for using real bool types.
171
    thread_attached= false;
1 by brian
clean slate
172
#ifndef DBUG_OFF
173
    swap_dbug_explain();
174
#endif
175
  }
176
}
177
178
179
/*
180
  Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
181
182
  This is used to preserve the SESSION DEBUG variable, which is mapped to the OS 
183
  thread during a command, but each command is handled by a different thread.
184
*/
185
186
#ifndef DBUG_OFF
187
void thd_scheduler::swap_dbug_explain()
188
{
189
  char buffer[sizeof(dbug_explain_buf)];
190
  if (DBUG_EXPLAIN(buffer, sizeof(buffer)))
191
    sql_print_error("DBUG_EXPLAIN buffer too small.\n");
192
  DBUG_POP();
193
  DBUG_PUSH(dbug_explain_buf);
194
  memcpy(dbug_explain_buf, buffer, sizeof(buffer));
195
}
196
#endif
197
198
/**
199
  Create all threads for the thread pool
200
201
  NOTES
202
    After threads are created we wait until all threads has signaled that
203
    they have started before we return
204
205
  RETURN
206
    0  ok
207
    1  We got an error creating the thread pool
208
       In this case we will abort all created threads
209
*/
210
211
static bool libevent_init(void)
212
{
213
  uint i;
214
  DBUG_ENTER("libevent_init");
215
216
  event_init();
217
  
218
  created_threads= 0;
219
  killed_threads= 0;
55 by brian
Update for using real bool types.
220
  kill_pool_threads= false;
1 by brian
clean slate
221
  
222
  pthread_mutex_init(&LOCK_event_loop, NULL);
223
  pthread_mutex_init(&LOCK_thd_add, NULL);
224
  
225
  /* set up the pipe used to add new thds to the event pool */
226
  if (init_pipe(thd_add_pipe))
227
  {
228
    sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
229
    DBUG_RETURN(1);
230
  }
231
  /* set up the pipe used to kill thds in the event queue */
232
  if (init_pipe(thd_kill_pipe))
233
  {
234
    sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
235
    close(thd_add_pipe[0]);
236
    close(thd_add_pipe[1]);
237
    DBUG_RETURN(1);
238
  }
239
  event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
240
            libevent_add_thd_callback, NULL);
241
  event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
242
            libevent_kill_thd_callback, NULL);
243
 
244
 if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
245
 {
246
   sql_print_error("thd_add_event event_add error in libevent_init\n");
247
   libevent_end();
248
   DBUG_RETURN(1);
249
   
250
 }
251
  /* Set up the thread pool */
252
  created_threads= killed_threads= 0;
253
  pthread_mutex_lock(&LOCK_thread_count);
254
255
  for (i= 0; i < thread_pool_size; i++)
256
  {
257
    pthread_t thread;
258
    int error;
259
    if ((error= pthread_create(&thread, &connection_attrib,
260
                               libevent_thread_proc, 0)))
261
    {
262
      sql_print_error("Can't create completion port thread (error %d)",
263
                      error);
264
      pthread_mutex_unlock(&LOCK_thread_count);
265
      libevent_end();                      // Cleanup
55 by brian
Update for using real bool types.
266
      DBUG_RETURN(true);
1 by brian
clean slate
267
    }
268
  }
269
270
  /* Wait until all threads are created */
271
  while (created_threads != thread_pool_size)
272
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
273
  pthread_mutex_unlock(&LOCK_thread_count);
274
  
275
  DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size));
55 by brian
Update for using real bool types.
276
  DBUG_RETURN(false);
1 by brian
clean slate
277
}
278
279
280
/*
281
  This is called when data is ready on the socket.
282
  
283
  NOTES
284
    This is only called by the thread that owns LOCK_event_loop.
285
  
286
    We add the thd that got the data to thds_need_processing, and 
287
    cause the libevent event_loop() to terminate. Then this same thread will
288
    return from event_loop and pick the thd value back up for processing.
289
*/
290
291
void libevent_io_callback(int, short, void *ctx)
292
{    
293
  safe_mutex_assert_owner(&LOCK_event_loop);
294
  THD *thd= (THD*)ctx;
295
  thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
296
  thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
297
}
298
299
/*
300
  This is called when we have a thread we want to be killed.
301
  
302
  NOTES
303
    This is only called by the thread that owns LOCK_event_loop.
304
*/
305
306
void libevent_kill_thd_callback(int Fd, short, void*)
307
{    
308
  safe_mutex_assert_owner(&LOCK_event_loop);
309
310
  /* clear the pending events */
311
  char c;
312
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
313
  {}
314
315
  LIST* list= thds_waiting_for_io;
316
  while (list)
317
  {
318
    THD *thd= (THD*)list->data;
319
    list= list_rest(list);
320
    if (thd->killed == THD::KILL_CONNECTION)
321
    {
322
      /*
323
        Delete from libevent and add to the processing queue.
324
      */
325
      event_del(thd->scheduler.io_event);
326
      thds_waiting_for_io= list_delete(thds_waiting_for_io,
327
                                       &thd->scheduler.list);
328
      thds_need_processing= list_add(thds_need_processing,
329
                                     &thd->scheduler.list);
330
    }
331
  }
332
}
333
334
335
/*
336
  This is used to add connections to the pool. This callback is invoked from
337
  the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
338
  written to it.
339
  
340
  NOTES
341
    This is only called by the thread that owns LOCK_event_loop.
342
*/
343
344
void libevent_add_thd_callback(int Fd, short, void *)
345
{ 
346
  safe_mutex_assert_owner(&LOCK_event_loop);
347
348
  /* clear the pending events */
349
  char c;
350
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
351
  {}
352
353
  pthread_mutex_lock(&LOCK_thd_add);
354
  while (thds_need_adding)
355
  {
356
    /* pop the first thd off the list */
357
    THD* thd= (THD*)thds_need_adding->data;
358
    thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
359
360
    pthread_mutex_unlock(&LOCK_thd_add);
361
    
362
    if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
363
    {
364
      /*
365
        Add thd to thds_need_processing list. If it needs closing we'll close
366
        it outside of event_loop().
367
      */
368
      thds_need_processing= list_add(thds_need_processing,
369
                                     &thd->scheduler.list);
370
    }
371
    else
372
    {
373
      /* Add to libevent */
374
      if (event_add(thd->scheduler.io_event, NULL))
375
      {
376
        sql_print_error("event_add error in libevent_add_thd_callback\n");
377
        libevent_connection_close(thd);
378
      } 
379
      else
380
      {
381
        thds_waiting_for_io= list_add(thds_waiting_for_io,
382
                                      &thd->scheduler.list);
383
      }
384
    }
385
    pthread_mutex_lock(&LOCK_thd_add);
386
  }
387
  pthread_mutex_unlock(&LOCK_thd_add);
388
}
389
390
391
/**
392
  Notify the thread pool about a new connection
393
394
  NOTES
395
    LOCK_thread_count is locked on entry. This function MUST unlock it!
396
*/
397
398
static void libevent_add_connection(THD *thd)
399
{
400
  DBUG_ENTER("libevent_add_connection");
401
  DBUG_PRINT("enter", ("thd: 0x%lx  thread_id: %lu",
402
                       (long) thd, thd->thread_id));
403
  
404
  if (thd->scheduler.init(thd))
405
  {
406
    sql_print_error("Scheduler init error in libevent_add_new_connection\n");
407
    pthread_mutex_unlock(&LOCK_thread_count);
408
    libevent_connection_close(thd);
409
    DBUG_VOID_RETURN;
410
  }
411
  threads.append(thd);
412
  libevent_thd_add(thd);
413
  
414
  pthread_mutex_unlock(&LOCK_thread_count);
415
  DBUG_VOID_RETURN;
416
}
417
418
419
/**
420
  @brief Signal a waiting connection it's time to die.
421
 
422
  @details This function will signal libevent the THD should be killed.
423
    Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
424
    upon entry.
425
 
426
  @param[in]  thd The connection to kill
427
*/
428
429
static void libevent_post_kill_notification(THD *)
430
{
431
  /*
432
    Note, we just wake up libevent with an event that a THD should be killed,
433
    It will search its list of thds for thd->killed ==  KILL_CONNECTION to
434
    find the THDs it should kill.
435
    
436
    So we don't actually tell it which one and we don't actually use the
437
    THD being passed to us, but that's just a design detail that could change
438
    later.
439
  */
440
  char c= 0;
441
  write(thd_kill_pipe[1], &c, sizeof(c));
442
}
443
444
445
/*
446
  Close and delete a connection.
447
*/
448
449
static void libevent_connection_close(THD *thd)
450
{
451
  DBUG_ENTER("libevent_connection_close");
452
  DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
453
454
  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
455
456
  if (thd->net.vio->sd >= 0)                  // not already closed
457
  {
458
    end_connection(thd);
459
    close_connection(thd, 0, 1);
460
  }
461
  thd->scheduler.thread_detach();
462
  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
463
  pthread_mutex_unlock(&LOCK_thread_count);
464
465
  DBUG_VOID_RETURN;
466
}
467
468
469
/*
470
  Returns true if we should close and delete a THD connection.
471
*/
472
473
static bool libevent_should_close_connection(THD* thd)
474
{
475
  return thd->net.error ||
476
         thd->net.vio == 0 ||
477
         thd->killed == THD::KILL_CONNECTION;
478
}
479
480
481
/*
482
  libevent_thread_proc is the outer loop of each thread in the thread pool.
483
  These procs only return/terminate on shutdown (kill_pool_threads == true).
484
*/
485
486
pthread_handler_t libevent_thread_proc(void *arg)
487
{
488
  if (init_new_connection_handler_thread())
489
  {
490
    my_thread_global_end();
491
    sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
492
    exit(1);
493
  }
494
  DBUG_ENTER("libevent_thread_proc");
495
496
  /*
497
    Signal libevent_init() when all threads has been created and are ready to
498
    receive events.
499
  */
500
  (void) pthread_mutex_lock(&LOCK_thread_count);
501
  created_threads++;
502
  if (created_threads == thread_pool_size)
503
    (void) pthread_cond_signal(&COND_thread_count);
504
  (void) pthread_mutex_unlock(&LOCK_thread_count);
505
  
506
  for (;;)
507
  {
508
    THD *thd= NULL;
509
    (void) pthread_mutex_lock(&LOCK_event_loop);
510
    
511
    /* get thd(s) to process */
512
    while (!thds_need_processing)
513
    {
514
      if (kill_pool_threads)
515
      {
516
        /* the flag that we should die has been set */
517
        (void) pthread_mutex_unlock(&LOCK_event_loop);
518
        goto thread_exit;
519
      }
520
      event_loop(EVLOOP_ONCE);
521
    }
522
    
523
    /* pop the first thd off the list */
524
    thd= (THD*)thds_need_processing->data;
525
    thds_need_processing= list_delete(thds_need_processing,
526
                                      thds_need_processing);
527
    
528
    (void) pthread_mutex_unlock(&LOCK_event_loop);
529
    
530
    /* now we process the connection (thd) */
531
    
532
    /* set up the thd<->thread links. */
533
    thd->thread_stack= (char*) &thd;
534
    
535
    if (thd->scheduler.thread_attach())
536
    {
537
      libevent_connection_close(thd);
538
      continue;
539
    }
540
541
    /* is the connection logged in yet? */
542
    if (!thd->scheduler.logged_in)
543
    {
544
      DBUG_PRINT("info", ("init new connection.  sd: %d",
545
                          thd->net.vio->sd));
546
      if (login_connection(thd))
547
      {
548
        /* Failed to log in */
549
        libevent_connection_close(thd);
550
        continue;
551
      }
552
      else
553
      {
554
        /* login successful */
55 by brian
Update for using real bool types.
555
        thd->scheduler.logged_in= true;
1 by brian
clean slate
556
        prepare_new_connection_state(thd);
557
        if (!libevent_needs_immediate_processing(thd))
558
          continue; /* New connection is now waiting for data in libevent*/
559
      }
560
    }
561
562
    do
563
    {
564
      /* Process a query */
565
      if (do_command(thd))
566
      {
567
        libevent_connection_close(thd);
568
        break;
569
      }
570
    } while (libevent_needs_immediate_processing(thd));
571
  }
572
  
573
thread_exit:
574
  DBUG_PRINT("exit", ("ending thread"));
575
  (void) pthread_mutex_lock(&LOCK_thread_count);
576
  killed_threads++;
577
  pthread_cond_broadcast(&COND_thread_count);
578
  (void) pthread_mutex_unlock(&LOCK_thread_count);
579
  my_thread_end();
580
  pthread_exit(0);
581
  DBUG_RETURN(0);                               /* purify: deadcode */
582
}
583
584
585
/*
55 by brian
Update for using real bool types.
586
  Returns true if the connection needs immediate processing and false if 
1 by brian
clean slate
587
  instead it's queued for libevent processing or closed,
588
*/
589
590
static bool libevent_needs_immediate_processing(THD *thd)
591
{
592
  if (libevent_should_close_connection(thd))
593
  {
594
    libevent_connection_close(thd);
55 by brian
Update for using real bool types.
595
    return false;
1 by brian
clean slate
596
  }
597
  /*
55 by brian
Update for using real bool types.
598
    If more data in the socket buffer, return true to process another command.
1 by brian
clean slate
599
600
    Note: we cannot add for event processing because the whole request might
601
    already be buffered and we wouldn't receive an event.
602
  */
603
  if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
55 by brian
Update for using real bool types.
604
    return true;
1 by brian
clean slate
605
  
606
  thd->scheduler.thread_detach();
607
  libevent_thd_add(thd);
55 by brian
Update for using real bool types.
608
  return false;
1 by brian
clean slate
609
}
610
611
612
/*
613
  Adds a THD to queued for libevent processing.
614
  
615
  This call does not actually register the event with libevent.
616
  Instead, it places the THD onto a queue and signals libevent by writing
617
  a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
618
  be invoked which will find the THD on the queue and add it to libevent.
619
*/
620
621
static void libevent_thd_add(THD* thd)
622
{
623
  char c=0;
624
  pthread_mutex_lock(&LOCK_thd_add);
625
  /* queue for libevent */
626
  thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
627
  /* notify libevent */
628
  write(thd_add_pipe[1], &c, sizeof(c));
629
  pthread_mutex_unlock(&LOCK_thd_add);
630
}
631
632
633
/**
634
  Wait until all pool threads have been deleted for clean shutdown
635
*/
636
637
static void libevent_end()
638
{
639
  DBUG_ENTER("libevent_end");
640
  DBUG_PRINT("enter", ("created_threads: %d  killed_threads: %u",
641
                       created_threads, killed_threads));
642
  
643
  
644
  (void) pthread_mutex_lock(&LOCK_thread_count);
645
  
55 by brian
Update for using real bool types.
646
  kill_pool_threads= true;
1 by brian
clean slate
647
  while (killed_threads != created_threads)
648
  {
649
    /* wake up the event loop */
650
    char c= 0;
651
    write(thd_add_pipe[1], &c, sizeof(c));
652
653
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
654
  }
655
  (void) pthread_mutex_unlock(&LOCK_thread_count);
656
  
657
  event_del(&thd_add_event);
658
  close(thd_add_pipe[0]);
659
  close(thd_add_pipe[1]);
660
  event_del(&thd_kill_event);
661
  close(thd_kill_pipe[0]);
662
  close(thd_kill_pipe[1]);
663
664
  (void) pthread_mutex_destroy(&LOCK_event_loop);
665
  (void) pthread_mutex_destroy(&LOCK_thd_add);
666
  DBUG_VOID_RETURN;
667
}
668
669
670
void pool_of_threads_scheduler(scheduler_functions* func)
671
{
672
  func->max_threads= thread_pool_size;
673
  func->init= libevent_init;
674
  func->end=  libevent_end;
675
  func->post_kill_notification= libevent_post_kill_notification;
676
  func->add_connection= libevent_add_connection;
677
}