~drizzle-trunk/drizzle/development

1 by brian
clean slate
1
/* Copyright (C) 2007 MySQL AB
2
3
   This program is free software; you can redistribute it and/or modify
4
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
6
7
   This program is distributed in the hope that it will be useful,
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
   GNU General Public License for more details.
11
12
   You should have received a copy of the GNU General Public License
13
   along with this program; if not, write to the Free Software
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
16
/*
17
  Implementation for the thread scheduler
18
*/
19
20
#ifdef USE_PRAGMA_INTERFACE
21
#pragma implementation
22
#endif
23
24
#include <mysql_priv.h>
25
#include "event.h"
26
27
28
/*
29
  'Dummy' functions to be used when we don't need any handling for a scheduler
30
  event
31
 */
32
33
static bool init_dummy(void) {return 0;}
212.1.3 by Monty Taylor
Renamed __attribute__((__unused__)) to __attribute__((unused)).
34
static void post_kill_dummy(THD *thd __attribute__((unused))) {}
1 by brian
clean slate
35
static void end_dummy(void) {}
212.1.3 by Monty Taylor
Renamed __attribute__((__unused__)) to __attribute__((unused)).
36
static bool end_thread_dummy(THD *thd __attribute__((unused)),
37
                             bool cache_thread __attribute__((unused)))
77.1.45 by Monty Taylor
Warning fixes.
38
{ return 0; }
1 by brian
clean slate
39
40
/*
41
  Initialize default scheduler with dummy functions so that setup functions
42
  only need to declare those that are relvant for their usage
43
*/
44
45
scheduler_functions::scheduler_functions()
46
  :init(init_dummy),
47
   init_new_connection_thread(init_new_connection_handler_thread),
48
   add_connection(0),                           // Must be defined
49
   post_kill_notification(post_kill_dummy),
50
   end_thread(end_thread_dummy), end(end_dummy)
51
{}
52
53
static uint created_threads, killed_threads;
54
static bool kill_pool_threads;
55
56
static struct event thd_add_event;
57
static struct event thd_kill_event;
58
59
static pthread_mutex_t LOCK_thd_add;    /* protects thds_need_adding */
60
static LIST *thds_need_adding;    /* list of thds to add to libevent queue */
61
62
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
63
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
64
65
/*
66
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
67
  event_del) and thds_need_processing and thds_waiting_for_io.
68
*/
69
static pthread_mutex_t LOCK_event_loop;
70
static LIST *thds_need_processing; /* list of thds that needs some processing */
71
static LIST *thds_waiting_for_io; /* list of thds with added events */
72
73
pthread_handler_t libevent_thread_proc(void *arg);
74
static void libevent_end();
75
static bool libevent_needs_immediate_processing(THD *thd);
76
static void libevent_connection_close(THD *thd);
77
static bool libevent_should_close_connection(THD* thd);
78
static void libevent_thd_add(THD* thd);
79
void libevent_io_callback(int Fd, short Operation, void *ctx);
80
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
81
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
82
83
84
/*
85
  Create a pipe and set to non-blocking.
55 by brian
Update for using real bool types.
86
  Returns true if there is an error.
1 by brian
clean slate
87
*/
88
89
static bool init_pipe(int pipe_fds[])
90
{
91
  int flags;
92
  return pipe(pipe_fds) < 0 ||
93
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
94
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
95
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
96
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
97
}
98
99
100
/*
101
  thd_scheduler keeps the link between THD and events.
102
  It's embedded in the THD class.
103
*/
104
105
thd_scheduler::thd_scheduler()
55 by brian
Update for using real bool types.
106
  : logged_in(false), io_event(NULL), thread_attached(false)
1 by brian
clean slate
107
{  
108
  dbug_explain_buf[0]= 0;
109
}
110
111
112
thd_scheduler::~thd_scheduler()
113
{
114
  my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
115
}
116
117
118
bool thd_scheduler::init(THD *parent_thd)
119
{
120
  io_event=
121
    (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
122
    
123
  if (!io_event)
124
  {
125
    sql_print_error("Memory allocation error in thd_scheduler::init\n");
55 by brian
Update for using real bool types.
126
    return true;
1 by brian
clean slate
127
  }
128
  
129
  event_set(io_event, parent_thd->net.vio->sd, EV_READ, 
130
            libevent_io_callback, (void*)parent_thd);
131
    
132
  list.data= parent_thd;
133
  
55 by brian
Update for using real bool types.
134
  return false;
1 by brian
clean slate
135
}
136
137
138
/*
139
  Attach/associate the connection with the OS thread, for command processing.
140
*/
141
142
bool thd_scheduler::thread_attach()
143
{
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
144
  assert(!thread_attached);
1 by brian
clean slate
145
  THD* thd = (THD*)list.data;
146
  if (libevent_should_close_connection(thd) ||
147
      setup_connection_thread_globals(thd))
148
  {
55 by brian
Update for using real bool types.
149
    return true;
1 by brian
clean slate
150
  }
151
  my_errno= 0;
152
  thd->mysys_var->abort= 0;
55 by brian
Update for using real bool types.
153
  thread_attached= true;
1 by brian
clean slate
154
  swap_dbug_explain();
55 by brian
Update for using real bool types.
155
  return false;
1 by brian
clean slate
156
}
157
158
159
/*
160
  Detach/disassociate the connection with the OS thread.
161
*/
162
163
void thd_scheduler::thread_detach()
164
{
165
  if (thread_attached)
166
  {
167
    THD* thd = (THD*)list.data;
168
    thd->mysys_var= NULL;
55 by brian
Update for using real bool types.
169
    thread_attached= false;
1 by brian
clean slate
170
    swap_dbug_explain();
171
  }
172
}
173
174
175
/*
176
  Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
177
178
  This is used to preserve the SESSION DEBUG variable, which is mapped to the OS 
179
  thread during a command, but each command is handled by a different thread.
180
*/
181
void thd_scheduler::swap_dbug_explain()
182
{
183
  char buffer[sizeof(dbug_explain_buf)];
184
  memcpy(dbug_explain_buf, buffer, sizeof(buffer));
185
}
186
187
/**
188
  Create all threads for the thread pool
189
190
  NOTES
191
    After threads are created we wait until all threads has signaled that
192
    they have started before we return
193
194
  RETURN
195
    0  ok
196
    1  We got an error creating the thread pool
197
       In this case we will abort all created threads
198
*/
199
200
static bool libevent_init(void)
201
{
202
  uint i;
203
204
  event_init();
205
  
206
  created_threads= 0;
207
  killed_threads= 0;
55 by brian
Update for using real bool types.
208
  kill_pool_threads= false;
1 by brian
clean slate
209
  
210
  pthread_mutex_init(&LOCK_event_loop, NULL);
211
  pthread_mutex_init(&LOCK_thd_add, NULL);
212
  
213
  /* set up the pipe used to add new thds to the event pool */
214
  if (init_pipe(thd_add_pipe))
215
  {
216
    sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
217
    return(1);
1 by brian
clean slate
218
  }
219
  /* set up the pipe used to kill thds in the event queue */
220
  if (init_pipe(thd_kill_pipe))
221
  {
222
    sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
223
    close(thd_add_pipe[0]);
224
    close(thd_add_pipe[1]);
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
225
    return(1);
1 by brian
clean slate
226
  }
227
  event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
228
            libevent_add_thd_callback, NULL);
229
  event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
230
            libevent_kill_thd_callback, NULL);
231
 
232
 if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
233
 {
234
   sql_print_error("thd_add_event event_add error in libevent_init\n");
235
   libevent_end();
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
236
   return(1);
1 by brian
clean slate
237
   
238
 }
239
  /* Set up the thread pool */
240
  created_threads= killed_threads= 0;
241
  pthread_mutex_lock(&LOCK_thread_count);
242
243
  for (i= 0; i < thread_pool_size; i++)
244
  {
245
    pthread_t thread;
246
    int error;
247
    if ((error= pthread_create(&thread, &connection_attrib,
248
                               libevent_thread_proc, 0)))
249
    {
250
      sql_print_error("Can't create completion port thread (error %d)",
251
                      error);
252
      pthread_mutex_unlock(&LOCK_thread_count);
253
      libevent_end();                      // Cleanup
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
254
      return(true);
1 by brian
clean slate
255
    }
256
  }
257
258
  /* Wait until all threads are created */
259
  while (created_threads != thread_pool_size)
260
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
261
  pthread_mutex_unlock(&LOCK_thread_count);
262
  
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
263
  return(false);
1 by brian
clean slate
264
}
265
266
267
/*
268
  This is called when data is ready on the socket.
269
  
270
  NOTES
271
    This is only called by the thread that owns LOCK_event_loop.
272
  
273
    We add the thd that got the data to thds_need_processing, and 
274
    cause the libevent event_loop() to terminate. Then this same thread will
275
    return from event_loop and pick the thd value back up for processing.
276
*/
277
278
void libevent_io_callback(int, short, void *ctx)
279
{    
280
  safe_mutex_assert_owner(&LOCK_event_loop);
281
  THD *thd= (THD*)ctx;
282
  thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
283
  thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
284
}
285
286
/*
287
  This is called when we have a thread we want to be killed.
288
  
289
  NOTES
290
    This is only called by the thread that owns LOCK_event_loop.
291
*/
292
293
void libevent_kill_thd_callback(int Fd, short, void*)
294
{    
295
  safe_mutex_assert_owner(&LOCK_event_loop);
296
297
  /* clear the pending events */
298
  char c;
299
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
300
  {}
301
302
  LIST* list= thds_waiting_for_io;
303
  while (list)
304
  {
305
    THD *thd= (THD*)list->data;
306
    list= list_rest(list);
307
    if (thd->killed == THD::KILL_CONNECTION)
308
    {
309
      /*
310
        Delete from libevent and add to the processing queue.
311
      */
312
      event_del(thd->scheduler.io_event);
313
      thds_waiting_for_io= list_delete(thds_waiting_for_io,
314
                                       &thd->scheduler.list);
315
      thds_need_processing= list_add(thds_need_processing,
316
                                     &thd->scheduler.list);
317
    }
318
  }
319
}
320
321
322
/*
323
  This is used to add connections to the pool. This callback is invoked from
324
  the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
325
  written to it.
326
  
327
  NOTES
328
    This is only called by the thread that owns LOCK_event_loop.
329
*/
330
331
void libevent_add_thd_callback(int Fd, short, void *)
332
{ 
333
  safe_mutex_assert_owner(&LOCK_event_loop);
334
335
  /* clear the pending events */
336
  char c;
337
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
338
  {}
339
340
  pthread_mutex_lock(&LOCK_thd_add);
341
  while (thds_need_adding)
342
  {
343
    /* pop the first thd off the list */
344
    THD* thd= (THD*)thds_need_adding->data;
345
    thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
346
347
    pthread_mutex_unlock(&LOCK_thd_add);
348
    
349
    if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
350
    {
351
      /*
352
        Add thd to thds_need_processing list. If it needs closing we'll close
353
        it outside of event_loop().
354
      */
355
      thds_need_processing= list_add(thds_need_processing,
356
                                     &thd->scheduler.list);
357
    }
358
    else
359
    {
360
      /* Add to libevent */
361
      if (event_add(thd->scheduler.io_event, NULL))
362
      {
363
        sql_print_error("event_add error in libevent_add_thd_callback\n");
364
        libevent_connection_close(thd);
365
      } 
366
      else
367
      {
368
        thds_waiting_for_io= list_add(thds_waiting_for_io,
369
                                      &thd->scheduler.list);
370
      }
371
    }
372
    pthread_mutex_lock(&LOCK_thd_add);
373
  }
374
  pthread_mutex_unlock(&LOCK_thd_add);
375
}
376
377
378
/**
379
  Notify the thread pool about a new connection
380
381
  NOTES
382
    LOCK_thread_count is locked on entry. This function MUST unlock it!
383
*/
384
385
static void libevent_add_connection(THD *thd)
386
{
387
  if (thd->scheduler.init(thd))
388
  {
389
    sql_print_error("Scheduler init error in libevent_add_new_connection\n");
390
    pthread_mutex_unlock(&LOCK_thread_count);
391
    libevent_connection_close(thd);
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
392
    return;
1 by brian
clean slate
393
  }
394
  threads.append(thd);
395
  libevent_thd_add(thd);
396
  
397
  pthread_mutex_unlock(&LOCK_thread_count);
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
398
  return;
1 by brian
clean slate
399
}
400
401
402
/**
403
  @brief Signal a waiting connection it's time to die.
404
 
405
  @details This function will signal libevent the THD should be killed.
406
    Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
407
    upon entry.
408
 
409
  @param[in]  thd The connection to kill
410
*/
411
412
static void libevent_post_kill_notification(THD *)
413
{
414
  /*
415
    Note, we just wake up libevent with an event that a THD should be killed,
416
    It will search its list of thds for thd->killed ==  KILL_CONNECTION to
417
    find the THDs it should kill.
418
    
419
    So we don't actually tell it which one and we don't actually use the
420
    THD being passed to us, but that's just a design detail that could change
421
    later.
422
  */
423
  char c= 0;
424
  write(thd_kill_pipe[1], &c, sizeof(c));
425
}
426
427
428
/*
429
  Close and delete a connection.
430
*/
431
432
static void libevent_connection_close(THD *thd)
433
{
434
  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
435
436
  if (thd->net.vio->sd >= 0)                  // not already closed
437
  {
438
    end_connection(thd);
439
    close_connection(thd, 0, 1);
440
  }
441
  thd->scheduler.thread_detach();
442
  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
443
  pthread_mutex_unlock(&LOCK_thread_count);
444
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
445
  return;
1 by brian
clean slate
446
}
447
448
449
/*
450
  Returns true if we should close and delete a THD connection.
451
*/
452
453
static bool libevent_should_close_connection(THD* thd)
454
{
455
  return thd->net.error ||
456
         thd->net.vio == 0 ||
457
         thd->killed == THD::KILL_CONNECTION;
458
}
459
460
461
/*
462
  libevent_thread_proc is the outer loop of each thread in the thread pool.
463
  These procs only return/terminate on shutdown (kill_pool_threads == true).
464
*/
465
212.1.3 by Monty Taylor
Renamed __attribute__((__unused__)) to __attribute__((unused)).
466
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
1 by brian
clean slate
467
{
468
  if (init_new_connection_handler_thread())
469
  {
470
    my_thread_global_end();
471
    sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
472
    exit(1);
473
  }
474
475
  /*
476
    Signal libevent_init() when all threads has been created and are ready to
477
    receive events.
478
  */
479
  (void) pthread_mutex_lock(&LOCK_thread_count);
480
  created_threads++;
481
  if (created_threads == thread_pool_size)
482
    (void) pthread_cond_signal(&COND_thread_count);
483
  (void) pthread_mutex_unlock(&LOCK_thread_count);
484
  
485
  for (;;)
486
  {
487
    THD *thd= NULL;
488
    (void) pthread_mutex_lock(&LOCK_event_loop);
489
    
490
    /* get thd(s) to process */
491
    while (!thds_need_processing)
492
    {
493
      if (kill_pool_threads)
494
      {
495
        /* the flag that we should die has been set */
496
        (void) pthread_mutex_unlock(&LOCK_event_loop);
497
        goto thread_exit;
498
      }
499
      event_loop(EVLOOP_ONCE);
500
    }
501
    
502
    /* pop the first thd off the list */
503
    thd= (THD*)thds_need_processing->data;
504
    thds_need_processing= list_delete(thds_need_processing,
505
                                      thds_need_processing);
506
    
507
    (void) pthread_mutex_unlock(&LOCK_event_loop);
508
    
509
    /* now we process the connection (thd) */
510
    
511
    /* set up the thd<->thread links. */
512
    thd->thread_stack= (char*) &thd;
513
    
514
    if (thd->scheduler.thread_attach())
515
    {
516
      libevent_connection_close(thd);
517
      continue;
518
    }
519
520
    /* is the connection logged in yet? */
521
    if (!thd->scheduler.logged_in)
522
    {
523
      if (login_connection(thd))
524
      {
525
        /* Failed to log in */
526
        libevent_connection_close(thd);
527
        continue;
528
      }
529
      else
530
      {
531
        /* login successful */
55 by brian
Update for using real bool types.
532
        thd->scheduler.logged_in= true;
1 by brian
clean slate
533
        prepare_new_connection_state(thd);
534
        if (!libevent_needs_immediate_processing(thd))
535
          continue; /* New connection is now waiting for data in libevent*/
536
      }
537
    }
538
539
    do
540
    {
541
      /* Process a query */
542
      if (do_command(thd))
543
      {
544
        libevent_connection_close(thd);
545
        break;
546
      }
547
    } while (libevent_needs_immediate_processing(thd));
548
  }
549
  
550
thread_exit:
551
  (void) pthread_mutex_lock(&LOCK_thread_count);
552
  killed_threads++;
553
  pthread_cond_broadcast(&COND_thread_count);
554
  (void) pthread_mutex_unlock(&LOCK_thread_count);
555
  my_thread_end();
556
  pthread_exit(0);
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
557
  return(0);                               /* purify: deadcode */
1 by brian
clean slate
558
}
559
560
561
/*
55 by brian
Update for using real bool types.
562
  Returns true if the connection needs immediate processing and false if 
1 by brian
clean slate
563
  instead it's queued for libevent processing or closed,
564
*/
565
566
static bool libevent_needs_immediate_processing(THD *thd)
567
{
568
  if (libevent_should_close_connection(thd))
569
  {
570
    libevent_connection_close(thd);
55 by brian
Update for using real bool types.
571
    return false;
1 by brian
clean slate
572
  }
573
  /*
55 by brian
Update for using real bool types.
574
    If more data in the socket buffer, return true to process another command.
1 by brian
clean slate
575
576
    Note: we cannot add for event processing because the whole request might
577
    already be buffered and we wouldn't receive an event.
578
  */
579
  if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
55 by brian
Update for using real bool types.
580
    return true;
1 by brian
clean slate
581
  
582
  thd->scheduler.thread_detach();
583
  libevent_thd_add(thd);
55 by brian
Update for using real bool types.
584
  return false;
1 by brian
clean slate
585
}
586
587
588
/*
589
  Adds a THD to queued for libevent processing.
590
  
591
  This call does not actually register the event with libevent.
592
  Instead, it places the THD onto a queue and signals libevent by writing
593
  a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
594
  be invoked which will find the THD on the queue and add it to libevent.
595
*/
596
597
static void libevent_thd_add(THD* thd)
598
{
599
  char c=0;
600
  pthread_mutex_lock(&LOCK_thd_add);
601
  /* queue for libevent */
602
  thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
603
  /* notify libevent */
604
  write(thd_add_pipe[1], &c, sizeof(c));
605
  pthread_mutex_unlock(&LOCK_thd_add);
606
}
607
608
609
/**
610
  Wait until all pool threads have been deleted for clean shutdown
611
*/
612
613
static void libevent_end()
614
{
615
  (void) pthread_mutex_lock(&LOCK_thread_count);
616
  
55 by brian
Update for using real bool types.
617
  kill_pool_threads= true;
1 by brian
clean slate
618
  while (killed_threads != created_threads)
619
  {
620
    /* wake up the event loop */
621
    char c= 0;
622
    write(thd_add_pipe[1], &c, sizeof(c));
623
624
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
625
  }
626
  (void) pthread_mutex_unlock(&LOCK_thread_count);
627
  
628
  event_del(&thd_add_event);
629
  close(thd_add_pipe[0]);
630
  close(thd_add_pipe[1]);
631
  event_del(&thd_kill_event);
632
  close(thd_kill_pipe[0]);
633
  close(thd_kill_pipe[1]);
634
635
  (void) pthread_mutex_destroy(&LOCK_event_loop);
636
  (void) pthread_mutex_destroy(&LOCK_thd_add);
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
637
  return;
1 by brian
clean slate
638
}
639
640
641
void pool_of_threads_scheduler(scheduler_functions* func)
642
{
643
  func->max_threads= thread_pool_size;
644
  func->init= libevent_init;
645
  func->end=  libevent_end;
646
  func->post_kill_notification= libevent_post_kill_notification;
647
  func->add_connection= libevent_add_connection;
648
}