~drizzle-trunk/drizzle/development

1 by brian
clean slate
1
/* Copyright (C) 2007 MySQL AB
2
3
   This program is free software; you can redistribute it and/or modify
4
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
6
7
   This program is distributed in the hope that it will be useful,
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
   GNU General Public License for more details.
11
12
   You should have received a copy of the GNU General Public License
13
   along with this program; if not, write to the Free Software
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
16
/*
17
  Implementation for the thread scheduler
18
*/
19
20
#ifdef USE_PRAGMA_INTERFACE
21
#pragma implementation
22
#endif
23
24
#include <mysql_priv.h>
25
#include "event.h"
26
27
28
/*
29
  'Dummy' functions to be used when we don't need any handling for a scheduler
30
  event
31
 */
32
33
static bool init_dummy(void) {return 0;}
34
static void post_kill_dummy(THD *thd) {}  
35
static void end_dummy(void) {}
36
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; }
37
38
/*
39
  Initialize default scheduler with dummy functions so that setup functions
40
  only need to declare those that are relvant for their usage
41
*/
42
43
scheduler_functions::scheduler_functions()
44
  :init(init_dummy),
45
   init_new_connection_thread(init_new_connection_handler_thread),
46
   add_connection(0),                           // Must be defined
47
   post_kill_notification(post_kill_dummy),
48
   end_thread(end_thread_dummy), end(end_dummy)
49
{}
50
51
52
/*
53
  End connection, in case when we are using 'no-threads'
54
*/
55
56
static bool no_threads_end(THD *thd, bool put_in_cache)
57
{
58
  unlink_thd(thd);
59
  pthread_mutex_unlock(&LOCK_thread_count);
60
  return 1;                                     // Abort handle_one_connection
61
}
62
63
64
/*
65
  Initailize scheduler for --thread-handling=no-threads
66
*/
67
68
void one_thread_scheduler(scheduler_functions *func)
69
{
70
  func->max_threads= 1;
71
  func->add_connection= handle_connection_in_main_thread;
72
  func->init_new_connection_thread= init_dummy;
73
  func->end_thread= no_threads_end;
74
}
75
76
77
/*
78
  Initialize scheduler for --thread-handling=one-thread-per-connection
79
*/
80
81
void one_thread_per_connection_scheduler(scheduler_functions *func)
82
{
83
  func->max_threads= max_connections;
84
  func->add_connection= create_thread_to_handle_connection;
85
  func->end_thread= one_thread_per_connection_end;
86
}
87
88
static uint created_threads, killed_threads;
89
static bool kill_pool_threads;
90
91
static struct event thd_add_event;
92
static struct event thd_kill_event;
93
94
static pthread_mutex_t LOCK_thd_add;    /* protects thds_need_adding */
95
static LIST *thds_need_adding;    /* list of thds to add to libevent queue */
96
97
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
98
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
99
100
/*
101
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
102
  event_del) and thds_need_processing and thds_waiting_for_io.
103
*/
104
static pthread_mutex_t LOCK_event_loop;
105
static LIST *thds_need_processing; /* list of thds that needs some processing */
106
static LIST *thds_waiting_for_io; /* list of thds with added events */
107
108
pthread_handler_t libevent_thread_proc(void *arg);
109
static void libevent_end();
110
static bool libevent_needs_immediate_processing(THD *thd);
111
static void libevent_connection_close(THD *thd);
112
static bool libevent_should_close_connection(THD* thd);
113
static void libevent_thd_add(THD* thd);
114
void libevent_io_callback(int Fd, short Operation, void *ctx);
115
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
116
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
117
118
119
/*
120
  Create a pipe and set to non-blocking.
121
  Returns TRUE if there is an error.
122
*/
123
124
static bool init_pipe(int pipe_fds[])
125
{
126
  int flags;
127
  return pipe(pipe_fds) < 0 ||
128
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
129
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
130
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
131
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
132
}
133
134
135
/*
136
  thd_scheduler keeps the link between THD and events.
137
  It's embedded in the THD class.
138
*/
139
140
thd_scheduler::thd_scheduler()
141
  : logged_in(FALSE), io_event(NULL), thread_attached(FALSE)
142
{  
143
#ifndef DBUG_OFF
144
  dbug_explain_buf[0]= 0;
145
#endif
146
}
147
148
149
thd_scheduler::~thd_scheduler()
150
{
151
  my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
152
}
153
154
155
bool thd_scheduler::init(THD *parent_thd)
156
{
157
  io_event=
158
    (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
159
    
160
  if (!io_event)
161
  {
162
    sql_print_error("Memory allocation error in thd_scheduler::init\n");
163
    return TRUE;
164
  }
165
  
166
  event_set(io_event, parent_thd->net.vio->sd, EV_READ, 
167
            libevent_io_callback, (void*)parent_thd);
168
    
169
  list.data= parent_thd;
170
  
171
  return FALSE;
172
}
173
174
175
/*
176
  Attach/associate the connection with the OS thread, for command processing.
177
*/
178
179
bool thd_scheduler::thread_attach()
180
{
181
  DBUG_ASSERT(!thread_attached);
182
  THD* thd = (THD*)list.data;
183
  if (libevent_should_close_connection(thd) ||
184
      setup_connection_thread_globals(thd))
185
  {
186
    return TRUE;
187
  }
188
  my_errno= 0;
189
  thd->mysys_var->abort= 0;
190
  thread_attached= TRUE;
191
#ifndef DBUG_OFF
192
  swap_dbug_explain();
193
#endif
194
  return FALSE;
195
}
196
197
198
/*
199
  Detach/disassociate the connection with the OS thread.
200
*/
201
202
void thd_scheduler::thread_detach()
203
{
204
  if (thread_attached)
205
  {
206
    THD* thd = (THD*)list.data;
207
    thd->mysys_var= NULL;
208
    thread_attached= FALSE;
209
#ifndef DBUG_OFF
210
    swap_dbug_explain();
211
#endif
212
  }
213
}
214
215
216
/*
217
  Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
218
219
  This is used to preserve the SESSION DEBUG variable, which is mapped to the OS 
220
  thread during a command, but each command is handled by a different thread.
221
*/
222
223
#ifndef DBUG_OFF
224
void thd_scheduler::swap_dbug_explain()
225
{
226
  char buffer[sizeof(dbug_explain_buf)];
227
  if (DBUG_EXPLAIN(buffer, sizeof(buffer)))
228
    sql_print_error("DBUG_EXPLAIN buffer too small.\n");
229
  DBUG_POP();
230
  DBUG_PUSH(dbug_explain_buf);
231
  memcpy(dbug_explain_buf, buffer, sizeof(buffer));
232
}
233
#endif
234
235
/**
236
  Create all threads for the thread pool
237
238
  NOTES
239
    After threads are created we wait until all threads has signaled that
240
    they have started before we return
241
242
  RETURN
243
    0  ok
244
    1  We got an error creating the thread pool
245
       In this case we will abort all created threads
246
*/
247
248
static bool libevent_init(void)
249
{
250
  uint i;
251
  DBUG_ENTER("libevent_init");
252
253
  event_init();
254
  
255
  created_threads= 0;
256
  killed_threads= 0;
257
  kill_pool_threads= FALSE;
258
  
259
  pthread_mutex_init(&LOCK_event_loop, NULL);
260
  pthread_mutex_init(&LOCK_thd_add, NULL);
261
  
262
  /* set up the pipe used to add new thds to the event pool */
263
  if (init_pipe(thd_add_pipe))
264
  {
265
    sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
266
    DBUG_RETURN(1);
267
  }
268
  /* set up the pipe used to kill thds in the event queue */
269
  if (init_pipe(thd_kill_pipe))
270
  {
271
    sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
272
    close(thd_add_pipe[0]);
273
    close(thd_add_pipe[1]);
274
    DBUG_RETURN(1);
275
  }
276
  event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
277
            libevent_add_thd_callback, NULL);
278
  event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
279
            libevent_kill_thd_callback, NULL);
280
 
281
 if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
282
 {
283
   sql_print_error("thd_add_event event_add error in libevent_init\n");
284
   libevent_end();
285
   DBUG_RETURN(1);
286
   
287
 }
288
  /* Set up the thread pool */
289
  created_threads= killed_threads= 0;
290
  pthread_mutex_lock(&LOCK_thread_count);
291
292
  for (i= 0; i < thread_pool_size; i++)
293
  {
294
    pthread_t thread;
295
    int error;
296
    if ((error= pthread_create(&thread, &connection_attrib,
297
                               libevent_thread_proc, 0)))
298
    {
299
      sql_print_error("Can't create completion port thread (error %d)",
300
                      error);
301
      pthread_mutex_unlock(&LOCK_thread_count);
302
      libevent_end();                      // Cleanup
303
      DBUG_RETURN(TRUE);
304
    }
305
  }
306
307
  /* Wait until all threads are created */
308
  while (created_threads != thread_pool_size)
309
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
310
  pthread_mutex_unlock(&LOCK_thread_count);
311
  
312
  DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size));
313
  DBUG_RETURN(FALSE);
314
}
315
316
317
/*
318
  This is called when data is ready on the socket.
319
  
320
  NOTES
321
    This is only called by the thread that owns LOCK_event_loop.
322
  
323
    We add the thd that got the data to thds_need_processing, and 
324
    cause the libevent event_loop() to terminate. Then this same thread will
325
    return from event_loop and pick the thd value back up for processing.
326
*/
327
328
void libevent_io_callback(int, short, void *ctx)
329
{    
330
  safe_mutex_assert_owner(&LOCK_event_loop);
331
  THD *thd= (THD*)ctx;
332
  thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
333
  thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
334
}
335
336
/*
337
  This is called when we have a thread we want to be killed.
338
  
339
  NOTES
340
    This is only called by the thread that owns LOCK_event_loop.
341
*/
342
343
void libevent_kill_thd_callback(int Fd, short, void*)
344
{    
345
  safe_mutex_assert_owner(&LOCK_event_loop);
346
347
  /* clear the pending events */
348
  char c;
349
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
350
  {}
351
352
  LIST* list= thds_waiting_for_io;
353
  while (list)
354
  {
355
    THD *thd= (THD*)list->data;
356
    list= list_rest(list);
357
    if (thd->killed == THD::KILL_CONNECTION)
358
    {
359
      /*
360
        Delete from libevent and add to the processing queue.
361
      */
362
      event_del(thd->scheduler.io_event);
363
      thds_waiting_for_io= list_delete(thds_waiting_for_io,
364
                                       &thd->scheduler.list);
365
      thds_need_processing= list_add(thds_need_processing,
366
                                     &thd->scheduler.list);
367
    }
368
  }
369
}
370
371
372
/*
373
  This is used to add connections to the pool. This callback is invoked from
374
  the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
375
  written to it.
376
  
377
  NOTES
378
    This is only called by the thread that owns LOCK_event_loop.
379
*/
380
381
void libevent_add_thd_callback(int Fd, short, void *)
382
{ 
383
  safe_mutex_assert_owner(&LOCK_event_loop);
384
385
  /* clear the pending events */
386
  char c;
387
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
388
  {}
389
390
  pthread_mutex_lock(&LOCK_thd_add);
391
  while (thds_need_adding)
392
  {
393
    /* pop the first thd off the list */
394
    THD* thd= (THD*)thds_need_adding->data;
395
    thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
396
397
    pthread_mutex_unlock(&LOCK_thd_add);
398
    
399
    if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
400
    {
401
      /*
402
        Add thd to thds_need_processing list. If it needs closing we'll close
403
        it outside of event_loop().
404
      */
405
      thds_need_processing= list_add(thds_need_processing,
406
                                     &thd->scheduler.list);
407
    }
408
    else
409
    {
410
      /* Add to libevent */
411
      if (event_add(thd->scheduler.io_event, NULL))
412
      {
413
        sql_print_error("event_add error in libevent_add_thd_callback\n");
414
        libevent_connection_close(thd);
415
      } 
416
      else
417
      {
418
        thds_waiting_for_io= list_add(thds_waiting_for_io,
419
                                      &thd->scheduler.list);
420
      }
421
    }
422
    pthread_mutex_lock(&LOCK_thd_add);
423
  }
424
  pthread_mutex_unlock(&LOCK_thd_add);
425
}
426
427
428
/**
429
  Notify the thread pool about a new connection
430
431
  NOTES
432
    LOCK_thread_count is locked on entry. This function MUST unlock it!
433
*/
434
435
static void libevent_add_connection(THD *thd)
436
{
437
  DBUG_ENTER("libevent_add_connection");
438
  DBUG_PRINT("enter", ("thd: 0x%lx  thread_id: %lu",
439
                       (long) thd, thd->thread_id));
440
  
441
  if (thd->scheduler.init(thd))
442
  {
443
    sql_print_error("Scheduler init error in libevent_add_new_connection\n");
444
    pthread_mutex_unlock(&LOCK_thread_count);
445
    libevent_connection_close(thd);
446
    DBUG_VOID_RETURN;
447
  }
448
  threads.append(thd);
449
  libevent_thd_add(thd);
450
  
451
  pthread_mutex_unlock(&LOCK_thread_count);
452
  DBUG_VOID_RETURN;
453
}
454
455
456
/**
457
  @brief Signal a waiting connection it's time to die.
458
 
459
  @details This function will signal libevent the THD should be killed.
460
    Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
461
    upon entry.
462
 
463
  @param[in]  thd The connection to kill
464
*/
465
466
static void libevent_post_kill_notification(THD *)
467
{
468
  /*
469
    Note, we just wake up libevent with an event that a THD should be killed,
470
    It will search its list of thds for thd->killed ==  KILL_CONNECTION to
471
    find the THDs it should kill.
472
    
473
    So we don't actually tell it which one and we don't actually use the
474
    THD being passed to us, but that's just a design detail that could change
475
    later.
476
  */
477
  char c= 0;
478
  write(thd_kill_pipe[1], &c, sizeof(c));
479
}
480
481
482
/*
483
  Close and delete a connection.
484
*/
485
486
static void libevent_connection_close(THD *thd)
487
{
488
  DBUG_ENTER("libevent_connection_close");
489
  DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
490
491
  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
492
493
  if (thd->net.vio->sd >= 0)                  // not already closed
494
  {
495
    end_connection(thd);
496
    close_connection(thd, 0, 1);
497
  }
498
  thd->scheduler.thread_detach();
499
  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
500
  pthread_mutex_unlock(&LOCK_thread_count);
501
502
  DBUG_VOID_RETURN;
503
}
504
505
506
/*
507
  Returns true if we should close and delete a THD connection.
508
*/
509
510
static bool libevent_should_close_connection(THD* thd)
511
{
512
  return thd->net.error ||
513
         thd->net.vio == 0 ||
514
         thd->killed == THD::KILL_CONNECTION;
515
}
516
517
518
/*
519
  libevent_thread_proc is the outer loop of each thread in the thread pool.
520
  These procs only return/terminate on shutdown (kill_pool_threads == true).
521
*/
522
523
pthread_handler_t libevent_thread_proc(void *arg)
524
{
525
  if (init_new_connection_handler_thread())
526
  {
527
    my_thread_global_end();
528
    sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
529
    exit(1);
530
  }
531
  DBUG_ENTER("libevent_thread_proc");
532
533
  /*
534
    Signal libevent_init() when all threads has been created and are ready to
535
    receive events.
536
  */
537
  (void) pthread_mutex_lock(&LOCK_thread_count);
538
  created_threads++;
539
  if (created_threads == thread_pool_size)
540
    (void) pthread_cond_signal(&COND_thread_count);
541
  (void) pthread_mutex_unlock(&LOCK_thread_count);
542
  
543
  for (;;)
544
  {
545
    THD *thd= NULL;
546
    (void) pthread_mutex_lock(&LOCK_event_loop);
547
    
548
    /* get thd(s) to process */
549
    while (!thds_need_processing)
550
    {
551
      if (kill_pool_threads)
552
      {
553
        /* the flag that we should die has been set */
554
        (void) pthread_mutex_unlock(&LOCK_event_loop);
555
        goto thread_exit;
556
      }
557
      event_loop(EVLOOP_ONCE);
558
    }
559
    
560
    /* pop the first thd off the list */
561
    thd= (THD*)thds_need_processing->data;
562
    thds_need_processing= list_delete(thds_need_processing,
563
                                      thds_need_processing);
564
    
565
    (void) pthread_mutex_unlock(&LOCK_event_loop);
566
    
567
    /* now we process the connection (thd) */
568
    
569
    /* set up the thd<->thread links. */
570
    thd->thread_stack= (char*) &thd;
571
    
572
    if (thd->scheduler.thread_attach())
573
    {
574
      libevent_connection_close(thd);
575
      continue;
576
    }
577
578
    /* is the connection logged in yet? */
579
    if (!thd->scheduler.logged_in)
580
    {
581
      DBUG_PRINT("info", ("init new connection.  sd: %d",
582
                          thd->net.vio->sd));
583
      if (login_connection(thd))
584
      {
585
        /* Failed to log in */
586
        libevent_connection_close(thd);
587
        continue;
588
      }
589
      else
590
      {
591
        /* login successful */
592
        thd->scheduler.logged_in= TRUE;
593
        prepare_new_connection_state(thd);
594
        if (!libevent_needs_immediate_processing(thd))
595
          continue; /* New connection is now waiting for data in libevent*/
596
      }
597
    }
598
599
    do
600
    {
601
      /* Process a query */
602
      if (do_command(thd))
603
      {
604
        libevent_connection_close(thd);
605
        break;
606
      }
607
    } while (libevent_needs_immediate_processing(thd));
608
  }
609
  
610
thread_exit:
611
  DBUG_PRINT("exit", ("ending thread"));
612
  (void) pthread_mutex_lock(&LOCK_thread_count);
613
  killed_threads++;
614
  pthread_cond_broadcast(&COND_thread_count);
615
  (void) pthread_mutex_unlock(&LOCK_thread_count);
616
  my_thread_end();
617
  pthread_exit(0);
618
  DBUG_RETURN(0);                               /* purify: deadcode */
619
}
620
621
622
/*
623
  Returns TRUE if the connection needs immediate processing and FALSE if 
624
  instead it's queued for libevent processing or closed,
625
*/
626
627
static bool libevent_needs_immediate_processing(THD *thd)
628
{
629
  if (libevent_should_close_connection(thd))
630
  {
631
    libevent_connection_close(thd);
632
    return FALSE;
633
  }
634
  /*
635
    If more data in the socket buffer, return TRUE to process another command.
636
637
    Note: we cannot add for event processing because the whole request might
638
    already be buffered and we wouldn't receive an event.
639
  */
640
  if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
641
    return TRUE;
642
  
643
  thd->scheduler.thread_detach();
644
  libevent_thd_add(thd);
645
  return FALSE;
646
}
647
648
649
/*
650
  Adds a THD to queued for libevent processing.
651
  
652
  This call does not actually register the event with libevent.
653
  Instead, it places the THD onto a queue and signals libevent by writing
654
  a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
655
  be invoked which will find the THD on the queue and add it to libevent.
656
*/
657
658
static void libevent_thd_add(THD* thd)
659
{
660
  char c=0;
661
  pthread_mutex_lock(&LOCK_thd_add);
662
  /* queue for libevent */
663
  thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
664
  /* notify libevent */
665
  write(thd_add_pipe[1], &c, sizeof(c));
666
  pthread_mutex_unlock(&LOCK_thd_add);
667
}
668
669
670
/**
671
  Wait until all pool threads have been deleted for clean shutdown
672
*/
673
674
static void libevent_end()
675
{
676
  DBUG_ENTER("libevent_end");
677
  DBUG_PRINT("enter", ("created_threads: %d  killed_threads: %u",
678
                       created_threads, killed_threads));
679
  
680
  
681
  (void) pthread_mutex_lock(&LOCK_thread_count);
682
  
683
  kill_pool_threads= TRUE;
684
  while (killed_threads != created_threads)
685
  {
686
    /* wake up the event loop */
687
    char c= 0;
688
    write(thd_add_pipe[1], &c, sizeof(c));
689
690
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
691
  }
692
  (void) pthread_mutex_unlock(&LOCK_thread_count);
693
  
694
  event_del(&thd_add_event);
695
  close(thd_add_pipe[0]);
696
  close(thd_add_pipe[1]);
697
  event_del(&thd_kill_event);
698
  close(thd_kill_pipe[0]);
699
  close(thd_kill_pipe[1]);
700
701
  (void) pthread_mutex_destroy(&LOCK_event_loop);
702
  (void) pthread_mutex_destroy(&LOCK_thd_add);
703
  DBUG_VOID_RETURN;
704
}
705
706
707
void pool_of_threads_scheduler(scheduler_functions* func)
708
{
709
  func->max_threads= thread_pool_size;
710
  func->init= libevent_init;
711
  func->end=  libevent_end;
712
  func->post_kill_notification= libevent_post_kill_notification;
713
  func->add_connection= libevent_add_connection;
714
}