1
/* Copyright (C) 2007 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
Implementation for the thread scheduler
20
#include <drizzled/server_includes.h>
21
#include <libdrizzle/libdrizzle.h>
26
'Dummy' functions to be used when we don't need any handling for a scheduler
30
static bool init_dummy(void) {return 0;}
31
static void post_kill_dummy(THD *thd __attribute__((unused))) {}
32
static void end_dummy(void) {}
33
static bool end_thread_dummy(THD *thd __attribute__((unused)),
34
bool cache_thread __attribute__((unused)))
38
Initialize default scheduler with dummy functions so that setup functions
39
only need to declare those that are relvant for their usage
42
scheduler_functions::scheduler_functions()
44
init_new_connection_thread(init_new_connection_handler_thread),
45
add_connection(0), // Must be defined
46
post_kill_notification(post_kill_dummy),
47
end_thread(end_thread_dummy), end(end_dummy)
50
static uint32_t created_threads, killed_threads;
51
static bool kill_pool_threads;
53
static struct event thd_add_event;
54
static struct event thd_kill_event;
56
static pthread_mutex_t LOCK_thd_add; /* protects thds_need_adding */
57
static LIST *thds_need_adding; /* list of thds to add to libevent queue */
59
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
60
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
63
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
64
event_del) and thds_need_processing and thds_waiting_for_io.
66
static pthread_mutex_t LOCK_event_loop;
67
static LIST *thds_need_processing; /* list of thds that needs some processing */
68
static LIST *thds_waiting_for_io; /* list of thds with added events */
70
pthread_handler_t libevent_thread_proc(void *arg);
71
static void libevent_end();
72
static bool libevent_needs_immediate_processing(THD *thd);
73
static void libevent_connection_close(THD *thd);
74
static bool libevent_should_close_connection(THD* thd);
75
static void libevent_thd_add(THD* thd);
76
void libevent_io_callback(int Fd, short Operation, void *ctx);
77
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
78
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
82
Create a pipe and set to non-blocking.
83
Returns true if there is an error.
86
static bool init_pipe(int pipe_fds[])
89
return pipe(pipe_fds) < 0 ||
90
(flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
91
fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
92
(flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
93
fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
98
thd_scheduler keeps the link between THD and events.
99
It's embedded in the THD class.
102
thd_scheduler::thd_scheduler()
103
: logged_in(false), io_event(NULL), thread_attached(false)
105
dbug_explain_buf[0]= 0;
109
thd_scheduler::~thd_scheduler()
115
bool thd_scheduler::init(THD *parent_thd)
118
(struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
122
sql_print_error(_("Memory allocation error in thd_scheduler::init\n"));
126
event_set(io_event, net_get_sd(&(parent_thd->net)), EV_READ,
127
libevent_io_callback, (void*)parent_thd);
129
list.data= parent_thd;
136
Attach/associate the connection with the OS thread, for command processing.
139
bool thd_scheduler::thread_attach()
141
assert(!thread_attached);
142
THD* thd = (THD*)list.data;
143
if (libevent_should_close_connection(thd) ||
144
setup_connection_thread_globals(thd))
149
thd->mysys_var->abort= 0;
150
thread_attached= true;
157
Detach/disassociate the connection with the OS thread.
160
void thd_scheduler::thread_detach()
164
THD* thd = (THD*)list.data;
165
thd->mysys_var= NULL;
166
thread_attached= false;
173
Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
175
This is used to preserve the SESSION DEBUG variable, which is mapped to the OS
176
thread during a command, but each command is handled by a different thread.
178
void thd_scheduler::swap_dbug_explain()
180
char buffer[sizeof(dbug_explain_buf)];
181
memcpy(dbug_explain_buf, buffer, sizeof(buffer));
185
Create all threads for the thread pool
188
After threads are created we wait until all threads has signaled that
189
they have started before we return
193
1 We got an error creating the thread pool
194
In this case we will abort all created threads
197
static bool libevent_init(void)
205
kill_pool_threads= false;
207
pthread_mutex_init(&LOCK_event_loop, NULL);
208
pthread_mutex_init(&LOCK_thd_add, NULL);
210
/* set up the pipe used to add new thds to the event pool */
211
if (init_pipe(thd_add_pipe))
213
sql_print_error(_("init_pipe(thd_add_pipe) error in libevent_init\n"));
216
/* set up the pipe used to kill thds in the event queue */
217
if (init_pipe(thd_kill_pipe))
219
sql_print_error(_("init_pipe(thd_kill_pipe) error in libevent_init\n"));
220
close(thd_add_pipe[0]);
221
close(thd_add_pipe[1]);
224
event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
225
libevent_add_thd_callback, NULL);
226
event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
227
libevent_kill_thd_callback, NULL);
229
if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
231
sql_print_error(_("thd_add_event event_add error in libevent_init\n"));
236
/* Set up the thread pool */
237
created_threads= killed_threads= 0;
238
pthread_mutex_lock(&LOCK_thread_count);
240
for (i= 0; i < thread_pool_size; i++)
244
if ((error= pthread_create(&thread, &connection_attrib,
245
libevent_thread_proc, 0)))
247
sql_print_error(_("Can't create completion port thread (error %d)"),
249
pthread_mutex_unlock(&LOCK_thread_count);
250
libevent_end(); // Cleanup
255
/* Wait until all threads are created */
256
while (created_threads != thread_pool_size)
257
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
258
pthread_mutex_unlock(&LOCK_thread_count);
265
This is called when data is ready on the socket.
268
This is only called by the thread that owns LOCK_event_loop.
270
We add the thd that got the data to thds_need_processing, and
271
cause the libevent event_loop() to terminate. Then this same thread will
272
return from event_loop and pick the thd value back up for processing.
275
void libevent_io_callback(int, short, void *ctx)
277
safe_mutex_assert_owner(&LOCK_event_loop);
279
thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
280
thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
284
This is called when we have a thread we want to be killed.
287
This is only called by the thread that owns LOCK_event_loop.
290
void libevent_kill_thd_callback(int Fd, short, void*)
292
safe_mutex_assert_owner(&LOCK_event_loop);
294
/* clear the pending events */
296
while (read(Fd, &c, sizeof(c)) == sizeof(c))
299
LIST* list= thds_waiting_for_io;
302
THD *thd= (THD*)list->data;
303
list= list_rest(list);
304
if (thd->killed == THD::KILL_CONNECTION)
307
Delete from libevent and add to the processing queue.
309
event_del(thd->scheduler.io_event);
310
thds_waiting_for_io= list_delete(thds_waiting_for_io,
311
&thd->scheduler.list);
312
thds_need_processing= list_add(thds_need_processing,
313
&thd->scheduler.list);
320
This is used to add connections to the pool. This callback is invoked from
321
the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
325
This is only called by the thread that owns LOCK_event_loop.
328
void libevent_add_thd_callback(int Fd, short, void *)
330
safe_mutex_assert_owner(&LOCK_event_loop);
332
/* clear the pending events */
334
while (read(Fd, &c, sizeof(c)) == sizeof(c))
337
pthread_mutex_lock(&LOCK_thd_add);
338
while (thds_need_adding)
340
/* pop the first thd off the list */
341
THD* thd= (THD*)thds_need_adding->data;
342
thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
344
pthread_mutex_unlock(&LOCK_thd_add);
346
if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
349
Add thd to thds_need_processing list. If it needs closing we'll close
350
it outside of event_loop().
352
thds_need_processing= list_add(thds_need_processing,
353
&thd->scheduler.list);
357
/* Add to libevent */
358
if (event_add(thd->scheduler.io_event, NULL))
360
sql_print_error(_("event_add error in libevent_add_thd_callback\n"));
361
libevent_connection_close(thd);
365
thds_waiting_for_io= list_add(thds_waiting_for_io,
366
&thd->scheduler.list);
369
pthread_mutex_lock(&LOCK_thd_add);
371
pthread_mutex_unlock(&LOCK_thd_add);
376
Notify the thread pool about a new connection
379
LOCK_thread_count is locked on entry. This function MUST unlock it!
382
static void libevent_add_connection(THD *thd)
384
if (thd->scheduler.init(thd))
386
sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
387
pthread_mutex_unlock(&LOCK_thread_count);
388
libevent_connection_close(thd);
392
libevent_thd_add(thd);
394
pthread_mutex_unlock(&LOCK_thread_count);
400
@brief Signal a waiting connection it's time to die.
402
@details This function will signal libevent the THD should be killed.
403
Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
406
@param[in] thd The connection to kill
409
static void libevent_post_kill_notification(THD *)
412
Note, we just wake up libevent with an event that a THD should be killed,
413
It will search its list of thds for thd->killed == KILL_CONNECTION to
414
find the THDs it should kill.
416
So we don't actually tell it which one and we don't actually use the
417
THD being passed to us, but that's just a design detail that could change
421
write(thd_kill_pipe[1], &c, sizeof(c));
426
Close and delete a connection.
429
static void libevent_connection_close(THD *thd)
431
thd->killed= THD::KILL_CONNECTION; // Avoid error messages
433
if (net_get_sd(&(thd->net)) >= 0) // not already closed
436
close_connection(thd, 0, 1);
438
thd->scheduler.thread_detach();
439
unlink_thd(thd); /* locks LOCK_thread_count and deletes thd */
440
pthread_mutex_unlock(&LOCK_thread_count);
447
Returns true if we should close and delete a THD connection.
450
static bool libevent_should_close_connection(THD* thd)
452
return net_should_close(&(thd->net)) ||
453
thd->killed == THD::KILL_CONNECTION;
458
libevent_thread_proc is the outer loop of each thread in the thread pool.
459
These procs only return/terminate on shutdown (kill_pool_threads == true).
462
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
464
if (init_new_connection_handler_thread())
466
my_thread_global_end();
467
sql_print_error(_("libevent_thread_proc: my_thread_init() failed\n"));
472
Signal libevent_init() when all threads has been created and are ready to
475
(void) pthread_mutex_lock(&LOCK_thread_count);
477
if (created_threads == thread_pool_size)
478
(void) pthread_cond_signal(&COND_thread_count);
479
(void) pthread_mutex_unlock(&LOCK_thread_count);
484
(void) pthread_mutex_lock(&LOCK_event_loop);
486
/* get thd(s) to process */
487
while (!thds_need_processing)
489
if (kill_pool_threads)
491
/* the flag that we should die has been set */
492
(void) pthread_mutex_unlock(&LOCK_event_loop);
495
event_loop(EVLOOP_ONCE);
498
/* pop the first thd off the list */
499
thd= (THD*)thds_need_processing->data;
500
thds_need_processing= list_delete(thds_need_processing,
501
thds_need_processing);
503
(void) pthread_mutex_unlock(&LOCK_event_loop);
505
/* now we process the connection (thd) */
507
/* set up the thd<->thread links. */
508
thd->thread_stack= (char*) &thd;
510
if (thd->scheduler.thread_attach())
512
libevent_connection_close(thd);
516
/* is the connection logged in yet? */
517
if (!thd->scheduler.logged_in)
519
if (login_connection(thd))
521
/* Failed to log in */
522
libevent_connection_close(thd);
527
/* login successful */
528
thd->scheduler.logged_in= true;
529
prepare_new_connection_state(thd);
530
if (!libevent_needs_immediate_processing(thd))
531
continue; /* New connection is now waiting for data in libevent*/
537
/* Process a query */
540
libevent_connection_close(thd);
543
} while (libevent_needs_immediate_processing(thd));
547
(void) pthread_mutex_lock(&LOCK_thread_count);
549
pthread_cond_broadcast(&COND_thread_count);
550
(void) pthread_mutex_unlock(&LOCK_thread_count);
553
return(0); /* purify: deadcode */
558
Returns true if the connection needs immediate processing and false if
559
instead it's queued for libevent processing or closed,
562
static bool libevent_needs_immediate_processing(THD *thd)
564
if (libevent_should_close_connection(thd))
566
libevent_connection_close(thd);
570
If more data in the socket buffer, return true to process another command.
572
Note: we cannot add for event processing because the whole request might
573
already be buffered and we wouldn't receive an event.
575
if (net_more_data(&(thd->net)))
578
thd->scheduler.thread_detach();
579
libevent_thd_add(thd);
585
Adds a THD to queued for libevent processing.
587
This call does not actually register the event with libevent.
588
Instead, it places the THD onto a queue and signals libevent by writing
589
a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
590
be invoked which will find the THD on the queue and add it to libevent.
593
static void libevent_thd_add(THD* thd)
596
pthread_mutex_lock(&LOCK_thd_add);
597
/* queue for libevent */
598
thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
599
/* notify libevent */
600
write(thd_add_pipe[1], &c, sizeof(c));
601
pthread_mutex_unlock(&LOCK_thd_add);
606
Wait until all pool threads have been deleted for clean shutdown
609
static void libevent_end()
611
(void) pthread_mutex_lock(&LOCK_thread_count);
613
kill_pool_threads= true;
614
while (killed_threads != created_threads)
616
/* wake up the event loop */
618
write(thd_add_pipe[1], &c, sizeof(c));
620
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
622
(void) pthread_mutex_unlock(&LOCK_thread_count);
624
event_del(&thd_add_event);
625
close(thd_add_pipe[0]);
626
close(thd_add_pipe[1]);
627
event_del(&thd_kill_event);
628
close(thd_kill_pipe[0]);
629
close(thd_kill_pipe[1]);
631
(void) pthread_mutex_destroy(&LOCK_event_loop);
632
(void) pthread_mutex_destroy(&LOCK_thd_add);
637
void pool_of_threads_scheduler(scheduler_functions* func)
639
func->max_threads= thread_pool_size;
640
func->init= libevent_init;
641
func->end= libevent_end;
642
func->post_kill_notification= libevent_post_kill_notification;
643
func->add_connection= libevent_add_connection;