1
/* Copyright (C) 2007 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
Implementation for the thread scheduler
20
#ifdef USE_PRAGMA_INTERFACE
21
#pragma implementation
24
#include <mysql_priv.h>
29
'Dummy' functions to be used when we don't need any handling for a scheduler
33
static bool init_dummy(void) {return 0;}
34
static void post_kill_dummy(THD *thd __attribute__((__unused__))) {}
35
static void end_dummy(void) {}
36
static bool end_thread_dummy(THD *thd __attribute__((__unused__)),
37
bool cache_thread __attribute__((__unused__)))
41
Initialize default scheduler with dummy functions so that setup functions
42
only need to declare those that are relvant for their usage
45
scheduler_functions::scheduler_functions()
47
init_new_connection_thread(init_new_connection_handler_thread),
48
add_connection(0), // Must be defined
49
post_kill_notification(post_kill_dummy),
50
end_thread(end_thread_dummy), end(end_dummy)
53
static uint created_threads, killed_threads;
54
static bool kill_pool_threads;
56
static struct event thd_add_event;
57
static struct event thd_kill_event;
59
static pthread_mutex_t LOCK_thd_add; /* protects thds_need_adding */
60
static LIST *thds_need_adding; /* list of thds to add to libevent queue */
62
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
63
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
66
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
67
event_del) and thds_need_processing and thds_waiting_for_io.
69
static pthread_mutex_t LOCK_event_loop;
70
static LIST *thds_need_processing; /* list of thds that needs some processing */
71
static LIST *thds_waiting_for_io; /* list of thds with added events */
73
pthread_handler_t libevent_thread_proc(void *arg);
74
static void libevent_end();
75
static bool libevent_needs_immediate_processing(THD *thd);
76
static void libevent_connection_close(THD *thd);
77
static bool libevent_should_close_connection(THD* thd);
78
static void libevent_thd_add(THD* thd);
79
void libevent_io_callback(int Fd, short Operation, void *ctx);
80
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
81
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
85
Create a pipe and set to non-blocking.
86
Returns true if there is an error.
89
static bool init_pipe(int pipe_fds[])
92
return pipe(pipe_fds) < 0 ||
93
(flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
94
fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
95
(flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
96
fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
101
thd_scheduler keeps the link between THD and events.
102
It's embedded in the THD class.
105
thd_scheduler::thd_scheduler()
106
: logged_in(false), io_event(NULL), thread_attached(false)
108
dbug_explain_buf[0]= 0;
112
thd_scheduler::~thd_scheduler()
114
my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
118
bool thd_scheduler::init(THD *parent_thd)
121
(struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
125
sql_print_error("Memory allocation error in thd_scheduler::init\n");
129
event_set(io_event, parent_thd->net.vio->sd, EV_READ,
130
libevent_io_callback, (void*)parent_thd);
132
list.data= parent_thd;
139
Attach/associate the connection with the OS thread, for command processing.
142
bool thd_scheduler::thread_attach()
144
assert(!thread_attached);
145
THD* thd = (THD*)list.data;
146
if (libevent_should_close_connection(thd) ||
147
setup_connection_thread_globals(thd))
152
thd->mysys_var->abort= 0;
153
thread_attached= true;
160
Detach/disassociate the connection with the OS thread.
163
void thd_scheduler::thread_detach()
167
THD* thd = (THD*)list.data;
168
thd->mysys_var= NULL;
169
thread_attached= false;
176
Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
178
This is used to preserve the SESSION DEBUG variable, which is mapped to the OS
179
thread during a command, but each command is handled by a different thread.
181
void thd_scheduler::swap_dbug_explain()
183
char buffer[sizeof(dbug_explain_buf)];
184
memcpy(dbug_explain_buf, buffer, sizeof(buffer));
188
Create all threads for the thread pool
191
After threads are created we wait until all threads has signaled that
192
they have started before we return
196
1 We got an error creating the thread pool
197
In this case we will abort all created threads
200
static bool libevent_init(void)
208
kill_pool_threads= false;
210
pthread_mutex_init(&LOCK_event_loop, NULL);
211
pthread_mutex_init(&LOCK_thd_add, NULL);
213
/* set up the pipe used to add new thds to the event pool */
214
if (init_pipe(thd_add_pipe))
216
sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
219
/* set up the pipe used to kill thds in the event queue */
220
if (init_pipe(thd_kill_pipe))
222
sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
223
close(thd_add_pipe[0]);
224
close(thd_add_pipe[1]);
227
event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
228
libevent_add_thd_callback, NULL);
229
event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
230
libevent_kill_thd_callback, NULL);
232
if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
234
sql_print_error("thd_add_event event_add error in libevent_init\n");
239
/* Set up the thread pool */
240
created_threads= killed_threads= 0;
241
pthread_mutex_lock(&LOCK_thread_count);
243
for (i= 0; i < thread_pool_size; i++)
247
if ((error= pthread_create(&thread, &connection_attrib,
248
libevent_thread_proc, 0)))
250
sql_print_error("Can't create completion port thread (error %d)",
252
pthread_mutex_unlock(&LOCK_thread_count);
253
libevent_end(); // Cleanup
258
/* Wait until all threads are created */
259
while (created_threads != thread_pool_size)
260
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
261
pthread_mutex_unlock(&LOCK_thread_count);
268
This is called when data is ready on the socket.
271
This is only called by the thread that owns LOCK_event_loop.
273
We add the thd that got the data to thds_need_processing, and
274
cause the libevent event_loop() to terminate. Then this same thread will
275
return from event_loop and pick the thd value back up for processing.
278
void libevent_io_callback(int, short, void *ctx)
280
safe_mutex_assert_owner(&LOCK_event_loop);
282
thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
283
thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
287
This is called when we have a thread we want to be killed.
290
This is only called by the thread that owns LOCK_event_loop.
293
void libevent_kill_thd_callback(int Fd, short, void*)
295
safe_mutex_assert_owner(&LOCK_event_loop);
297
/* clear the pending events */
299
while (read(Fd, &c, sizeof(c)) == sizeof(c))
302
LIST* list= thds_waiting_for_io;
305
THD *thd= (THD*)list->data;
306
list= list_rest(list);
307
if (thd->killed == THD::KILL_CONNECTION)
310
Delete from libevent and add to the processing queue.
312
event_del(thd->scheduler.io_event);
313
thds_waiting_for_io= list_delete(thds_waiting_for_io,
314
&thd->scheduler.list);
315
thds_need_processing= list_add(thds_need_processing,
316
&thd->scheduler.list);
323
This is used to add connections to the pool. This callback is invoked from
324
the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
328
This is only called by the thread that owns LOCK_event_loop.
331
void libevent_add_thd_callback(int Fd, short, void *)
333
safe_mutex_assert_owner(&LOCK_event_loop);
335
/* clear the pending events */
337
while (read(Fd, &c, sizeof(c)) == sizeof(c))
340
pthread_mutex_lock(&LOCK_thd_add);
341
while (thds_need_adding)
343
/* pop the first thd off the list */
344
THD* thd= (THD*)thds_need_adding->data;
345
thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
347
pthread_mutex_unlock(&LOCK_thd_add);
349
if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
352
Add thd to thds_need_processing list. If it needs closing we'll close
353
it outside of event_loop().
355
thds_need_processing= list_add(thds_need_processing,
356
&thd->scheduler.list);
360
/* Add to libevent */
361
if (event_add(thd->scheduler.io_event, NULL))
363
sql_print_error("event_add error in libevent_add_thd_callback\n");
364
libevent_connection_close(thd);
368
thds_waiting_for_io= list_add(thds_waiting_for_io,
369
&thd->scheduler.list);
372
pthread_mutex_lock(&LOCK_thd_add);
374
pthread_mutex_unlock(&LOCK_thd_add);
379
Notify the thread pool about a new connection
382
LOCK_thread_count is locked on entry. This function MUST unlock it!
385
static void libevent_add_connection(THD *thd)
387
if (thd->scheduler.init(thd))
389
sql_print_error("Scheduler init error in libevent_add_new_connection\n");
390
pthread_mutex_unlock(&LOCK_thread_count);
391
libevent_connection_close(thd);
395
libevent_thd_add(thd);
397
pthread_mutex_unlock(&LOCK_thread_count);
403
@brief Signal a waiting connection it's time to die.
405
@details This function will signal libevent the THD should be killed.
406
Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
409
@param[in] thd The connection to kill
412
static void libevent_post_kill_notification(THD *)
415
Note, we just wake up libevent with an event that a THD should be killed,
416
It will search its list of thds for thd->killed == KILL_CONNECTION to
417
find the THDs it should kill.
419
So we don't actually tell it which one and we don't actually use the
420
THD being passed to us, but that's just a design detail that could change
424
write(thd_kill_pipe[1], &c, sizeof(c));
429
Close and delete a connection.
432
static void libevent_connection_close(THD *thd)
434
thd->killed= THD::KILL_CONNECTION; // Avoid error messages
436
if (thd->net.vio->sd >= 0) // not already closed
439
close_connection(thd, 0, 1);
441
thd->scheduler.thread_detach();
442
unlink_thd(thd); /* locks LOCK_thread_count and deletes thd */
443
pthread_mutex_unlock(&LOCK_thread_count);
450
Returns true if we should close and delete a THD connection.
453
static bool libevent_should_close_connection(THD* thd)
455
return thd->net.error ||
457
thd->killed == THD::KILL_CONNECTION;
462
libevent_thread_proc is the outer loop of each thread in the thread pool.
463
These procs only return/terminate on shutdown (kill_pool_threads == true).
466
pthread_handler_t libevent_thread_proc(void *arg __attribute__((__unused__)))
468
if (init_new_connection_handler_thread())
470
my_thread_global_end();
471
sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
476
Signal libevent_init() when all threads has been created and are ready to
479
(void) pthread_mutex_lock(&LOCK_thread_count);
481
if (created_threads == thread_pool_size)
482
(void) pthread_cond_signal(&COND_thread_count);
483
(void) pthread_mutex_unlock(&LOCK_thread_count);
488
(void) pthread_mutex_lock(&LOCK_event_loop);
490
/* get thd(s) to process */
491
while (!thds_need_processing)
493
if (kill_pool_threads)
495
/* the flag that we should die has been set */
496
(void) pthread_mutex_unlock(&LOCK_event_loop);
499
event_loop(EVLOOP_ONCE);
502
/* pop the first thd off the list */
503
thd= (THD*)thds_need_processing->data;
504
thds_need_processing= list_delete(thds_need_processing,
505
thds_need_processing);
507
(void) pthread_mutex_unlock(&LOCK_event_loop);
509
/* now we process the connection (thd) */
511
/* set up the thd<->thread links. */
512
thd->thread_stack= (char*) &thd;
514
if (thd->scheduler.thread_attach())
516
libevent_connection_close(thd);
520
/* is the connection logged in yet? */
521
if (!thd->scheduler.logged_in)
523
if (login_connection(thd))
525
/* Failed to log in */
526
libevent_connection_close(thd);
531
/* login successful */
532
thd->scheduler.logged_in= true;
533
prepare_new_connection_state(thd);
534
if (!libevent_needs_immediate_processing(thd))
535
continue; /* New connection is now waiting for data in libevent*/
541
/* Process a query */
544
libevent_connection_close(thd);
547
} while (libevent_needs_immediate_processing(thd));
551
(void) pthread_mutex_lock(&LOCK_thread_count);
553
pthread_cond_broadcast(&COND_thread_count);
554
(void) pthread_mutex_unlock(&LOCK_thread_count);
557
return(0); /* purify: deadcode */
562
Returns true if the connection needs immediate processing and false if
563
instead it's queued for libevent processing or closed,
566
static bool libevent_needs_immediate_processing(THD *thd)
568
if (libevent_should_close_connection(thd))
570
libevent_connection_close(thd);
574
If more data in the socket buffer, return true to process another command.
576
Note: we cannot add for event processing because the whole request might
577
already be buffered and we wouldn't receive an event.
579
if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
582
thd->scheduler.thread_detach();
583
libevent_thd_add(thd);
589
Adds a THD to queued for libevent processing.
591
This call does not actually register the event with libevent.
592
Instead, it places the THD onto a queue and signals libevent by writing
593
a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
594
be invoked which will find the THD on the queue and add it to libevent.
597
static void libevent_thd_add(THD* thd)
600
pthread_mutex_lock(&LOCK_thd_add);
601
/* queue for libevent */
602
thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
603
/* notify libevent */
604
write(thd_add_pipe[1], &c, sizeof(c));
605
pthread_mutex_unlock(&LOCK_thd_add);
610
Wait until all pool threads have been deleted for clean shutdown
613
static void libevent_end()
615
(void) pthread_mutex_lock(&LOCK_thread_count);
617
kill_pool_threads= true;
618
while (killed_threads != created_threads)
620
/* wake up the event loop */
622
write(thd_add_pipe[1], &c, sizeof(c));
624
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
626
(void) pthread_mutex_unlock(&LOCK_thread_count);
628
event_del(&thd_add_event);
629
close(thd_add_pipe[0]);
630
close(thd_add_pipe[1]);
631
event_del(&thd_kill_event);
632
close(thd_kill_pipe[0]);
633
close(thd_kill_pipe[1]);
635
(void) pthread_mutex_destroy(&LOCK_event_loop);
636
(void) pthread_mutex_destroy(&LOCK_thd_add);
641
void pool_of_threads_scheduler(scheduler_functions* func)
643
func->max_threads= thread_pool_size;
644
func->init= libevent_init;
645
func->end= libevent_end;
646
func->post_kill_notification= libevent_post_kill_notification;
647
func->add_connection= libevent_add_connection;