1
/* Copyright (C) 2007 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
Implementation for the thread scheduler
20
#ifdef USE_PRAGMA_INTERFACE
21
#pragma implementation
24
#include <mysql_priv.h>
29
'Dummy' functions to be used when we don't need any handling for a scheduler
33
static bool init_dummy(void) {return 0;}
34
static void post_kill_dummy(THD *thd) {}
35
static void end_dummy(void) {}
36
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; }
39
Initialize default scheduler with dummy functions so that setup functions
40
only need to declare those that are relvant for their usage
43
scheduler_functions::scheduler_functions()
45
init_new_connection_thread(init_new_connection_handler_thread),
46
add_connection(0), // Must be defined
47
post_kill_notification(post_kill_dummy),
48
end_thread(end_thread_dummy), end(end_dummy)
53
End connection, in case when we are using 'no-threads'
56
static bool no_threads_end(THD *thd, bool put_in_cache)
59
pthread_mutex_unlock(&LOCK_thread_count);
60
return 1; // Abort handle_one_connection
65
Initailize scheduler for --thread-handling=no-threads
68
void one_thread_scheduler(scheduler_functions *func)
71
func->add_connection= handle_connection_in_main_thread;
72
func->init_new_connection_thread= init_dummy;
73
func->end_thread= no_threads_end;
78
Initialize scheduler for --thread-handling=one-thread-per-connection
81
void one_thread_per_connection_scheduler(scheduler_functions *func)
83
func->max_threads= max_connections;
84
func->add_connection= create_thread_to_handle_connection;
85
func->end_thread= one_thread_per_connection_end;
88
static uint created_threads, killed_threads;
89
static bool kill_pool_threads;
91
static struct event thd_add_event;
92
static struct event thd_kill_event;
94
static pthread_mutex_t LOCK_thd_add; /* protects thds_need_adding */
95
static LIST *thds_need_adding; /* list of thds to add to libevent queue */
97
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
98
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
101
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
102
event_del) and thds_need_processing and thds_waiting_for_io.
104
static pthread_mutex_t LOCK_event_loop;
105
static LIST *thds_need_processing; /* list of thds that needs some processing */
106
static LIST *thds_waiting_for_io; /* list of thds with added events */
108
pthread_handler_t libevent_thread_proc(void *arg);
109
static void libevent_end();
110
static bool libevent_needs_immediate_processing(THD *thd);
111
static void libevent_connection_close(THD *thd);
112
static bool libevent_should_close_connection(THD* thd);
113
static void libevent_thd_add(THD* thd);
114
void libevent_io_callback(int Fd, short Operation, void *ctx);
115
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
116
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
120
Create a pipe and set to non-blocking.
121
Returns TRUE if there is an error.
124
static bool init_pipe(int pipe_fds[])
127
return pipe(pipe_fds) < 0 ||
128
(flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
129
fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
130
(flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
131
fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
136
thd_scheduler keeps the link between THD and events.
137
It's embedded in the THD class.
140
thd_scheduler::thd_scheduler()
141
: logged_in(FALSE), io_event(NULL), thread_attached(FALSE)
144
dbug_explain_buf[0]= 0;
149
thd_scheduler::~thd_scheduler()
151
my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
155
bool thd_scheduler::init(THD *parent_thd)
158
(struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
162
sql_print_error("Memory allocation error in thd_scheduler::init\n");
166
event_set(io_event, parent_thd->net.vio->sd, EV_READ,
167
libevent_io_callback, (void*)parent_thd);
169
list.data= parent_thd;
176
Attach/associate the connection with the OS thread, for command processing.
179
bool thd_scheduler::thread_attach()
181
DBUG_ASSERT(!thread_attached);
182
THD* thd = (THD*)list.data;
183
if (libevent_should_close_connection(thd) ||
184
setup_connection_thread_globals(thd))
189
thd->mysys_var->abort= 0;
190
thread_attached= TRUE;
199
Detach/disassociate the connection with the OS thread.
202
void thd_scheduler::thread_detach()
206
THD* thd = (THD*)list.data;
207
thd->mysys_var= NULL;
208
thread_attached= FALSE;
217
Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
219
This is used to preserve the SESSION DEBUG variable, which is mapped to the OS
220
thread during a command, but each command is handled by a different thread.
224
void thd_scheduler::swap_dbug_explain()
226
char buffer[sizeof(dbug_explain_buf)];
227
if (DBUG_EXPLAIN(buffer, sizeof(buffer)))
228
sql_print_error("DBUG_EXPLAIN buffer too small.\n");
230
DBUG_PUSH(dbug_explain_buf);
231
memcpy(dbug_explain_buf, buffer, sizeof(buffer));
236
Create all threads for the thread pool
239
After threads are created we wait until all threads has signaled that
240
they have started before we return
244
1 We got an error creating the thread pool
245
In this case we will abort all created threads
248
static bool libevent_init(void)
251
DBUG_ENTER("libevent_init");
257
kill_pool_threads= FALSE;
259
pthread_mutex_init(&LOCK_event_loop, NULL);
260
pthread_mutex_init(&LOCK_thd_add, NULL);
262
/* set up the pipe used to add new thds to the event pool */
263
if (init_pipe(thd_add_pipe))
265
sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
268
/* set up the pipe used to kill thds in the event queue */
269
if (init_pipe(thd_kill_pipe))
271
sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
272
close(thd_add_pipe[0]);
273
close(thd_add_pipe[1]);
276
event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
277
libevent_add_thd_callback, NULL);
278
event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
279
libevent_kill_thd_callback, NULL);
281
if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
283
sql_print_error("thd_add_event event_add error in libevent_init\n");
288
/* Set up the thread pool */
289
created_threads= killed_threads= 0;
290
pthread_mutex_lock(&LOCK_thread_count);
292
for (i= 0; i < thread_pool_size; i++)
296
if ((error= pthread_create(&thread, &connection_attrib,
297
libevent_thread_proc, 0)))
299
sql_print_error("Can't create completion port thread (error %d)",
301
pthread_mutex_unlock(&LOCK_thread_count);
302
libevent_end(); // Cleanup
307
/* Wait until all threads are created */
308
while (created_threads != thread_pool_size)
309
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
310
pthread_mutex_unlock(&LOCK_thread_count);
312
DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size));
318
This is called when data is ready on the socket.
321
This is only called by the thread that owns LOCK_event_loop.
323
We add the thd that got the data to thds_need_processing, and
324
cause the libevent event_loop() to terminate. Then this same thread will
325
return from event_loop and pick the thd value back up for processing.
328
void libevent_io_callback(int, short, void *ctx)
330
safe_mutex_assert_owner(&LOCK_event_loop);
332
thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
333
thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
337
This is called when we have a thread we want to be killed.
340
This is only called by the thread that owns LOCK_event_loop.
343
void libevent_kill_thd_callback(int Fd, short, void*)
345
safe_mutex_assert_owner(&LOCK_event_loop);
347
/* clear the pending events */
349
while (read(Fd, &c, sizeof(c)) == sizeof(c))
352
LIST* list= thds_waiting_for_io;
355
THD *thd= (THD*)list->data;
356
list= list_rest(list);
357
if (thd->killed == THD::KILL_CONNECTION)
360
Delete from libevent and add to the processing queue.
362
event_del(thd->scheduler.io_event);
363
thds_waiting_for_io= list_delete(thds_waiting_for_io,
364
&thd->scheduler.list);
365
thds_need_processing= list_add(thds_need_processing,
366
&thd->scheduler.list);
373
This is used to add connections to the pool. This callback is invoked from
374
the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
378
This is only called by the thread that owns LOCK_event_loop.
381
void libevent_add_thd_callback(int Fd, short, void *)
383
safe_mutex_assert_owner(&LOCK_event_loop);
385
/* clear the pending events */
387
while (read(Fd, &c, sizeof(c)) == sizeof(c))
390
pthread_mutex_lock(&LOCK_thd_add);
391
while (thds_need_adding)
393
/* pop the first thd off the list */
394
THD* thd= (THD*)thds_need_adding->data;
395
thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
397
pthread_mutex_unlock(&LOCK_thd_add);
399
if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
402
Add thd to thds_need_processing list. If it needs closing we'll close
403
it outside of event_loop().
405
thds_need_processing= list_add(thds_need_processing,
406
&thd->scheduler.list);
410
/* Add to libevent */
411
if (event_add(thd->scheduler.io_event, NULL))
413
sql_print_error("event_add error in libevent_add_thd_callback\n");
414
libevent_connection_close(thd);
418
thds_waiting_for_io= list_add(thds_waiting_for_io,
419
&thd->scheduler.list);
422
pthread_mutex_lock(&LOCK_thd_add);
424
pthread_mutex_unlock(&LOCK_thd_add);
429
Notify the thread pool about a new connection
432
LOCK_thread_count is locked on entry. This function MUST unlock it!
435
static void libevent_add_connection(THD *thd)
437
DBUG_ENTER("libevent_add_connection");
438
DBUG_PRINT("enter", ("thd: 0x%lx thread_id: %lu",
439
(long) thd, thd->thread_id));
441
if (thd->scheduler.init(thd))
443
sql_print_error("Scheduler init error in libevent_add_new_connection\n");
444
pthread_mutex_unlock(&LOCK_thread_count);
445
libevent_connection_close(thd);
449
libevent_thd_add(thd);
451
pthread_mutex_unlock(&LOCK_thread_count);
457
@brief Signal a waiting connection it's time to die.
459
@details This function will signal libevent the THD should be killed.
460
Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
463
@param[in] thd The connection to kill
466
static void libevent_post_kill_notification(THD *)
469
Note, we just wake up libevent with an event that a THD should be killed,
470
It will search its list of thds for thd->killed == KILL_CONNECTION to
471
find the THDs it should kill.
473
So we don't actually tell it which one and we don't actually use the
474
THD being passed to us, but that's just a design detail that could change
478
write(thd_kill_pipe[1], &c, sizeof(c));
483
Close and delete a connection.
486
static void libevent_connection_close(THD *thd)
488
DBUG_ENTER("libevent_connection_close");
489
DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
491
thd->killed= THD::KILL_CONNECTION; // Avoid error messages
493
if (thd->net.vio->sd >= 0) // not already closed
496
close_connection(thd, 0, 1);
498
thd->scheduler.thread_detach();
499
unlink_thd(thd); /* locks LOCK_thread_count and deletes thd */
500
pthread_mutex_unlock(&LOCK_thread_count);
507
Returns true if we should close and delete a THD connection.
510
static bool libevent_should_close_connection(THD* thd)
512
return thd->net.error ||
514
thd->killed == THD::KILL_CONNECTION;
519
libevent_thread_proc is the outer loop of each thread in the thread pool.
520
These procs only return/terminate on shutdown (kill_pool_threads == true).
523
pthread_handler_t libevent_thread_proc(void *arg)
525
if (init_new_connection_handler_thread())
527
my_thread_global_end();
528
sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
531
DBUG_ENTER("libevent_thread_proc");
534
Signal libevent_init() when all threads has been created and are ready to
537
(void) pthread_mutex_lock(&LOCK_thread_count);
539
if (created_threads == thread_pool_size)
540
(void) pthread_cond_signal(&COND_thread_count);
541
(void) pthread_mutex_unlock(&LOCK_thread_count);
546
(void) pthread_mutex_lock(&LOCK_event_loop);
548
/* get thd(s) to process */
549
while (!thds_need_processing)
551
if (kill_pool_threads)
553
/* the flag that we should die has been set */
554
(void) pthread_mutex_unlock(&LOCK_event_loop);
557
event_loop(EVLOOP_ONCE);
560
/* pop the first thd off the list */
561
thd= (THD*)thds_need_processing->data;
562
thds_need_processing= list_delete(thds_need_processing,
563
thds_need_processing);
565
(void) pthread_mutex_unlock(&LOCK_event_loop);
567
/* now we process the connection (thd) */
569
/* set up the thd<->thread links. */
570
thd->thread_stack= (char*) &thd;
572
if (thd->scheduler.thread_attach())
574
libevent_connection_close(thd);
578
/* is the connection logged in yet? */
579
if (!thd->scheduler.logged_in)
581
DBUG_PRINT("info", ("init new connection. sd: %d",
583
if (login_connection(thd))
585
/* Failed to log in */
586
libevent_connection_close(thd);
591
/* login successful */
592
thd->scheduler.logged_in= TRUE;
593
prepare_new_connection_state(thd);
594
if (!libevent_needs_immediate_processing(thd))
595
continue; /* New connection is now waiting for data in libevent*/
601
/* Process a query */
604
libevent_connection_close(thd);
607
} while (libevent_needs_immediate_processing(thd));
611
DBUG_PRINT("exit", ("ending thread"));
612
(void) pthread_mutex_lock(&LOCK_thread_count);
614
pthread_cond_broadcast(&COND_thread_count);
615
(void) pthread_mutex_unlock(&LOCK_thread_count);
618
DBUG_RETURN(0); /* purify: deadcode */
623
Returns TRUE if the connection needs immediate processing and FALSE if
624
instead it's queued for libevent processing or closed,
627
static bool libevent_needs_immediate_processing(THD *thd)
629
if (libevent_should_close_connection(thd))
631
libevent_connection_close(thd);
635
If more data in the socket buffer, return TRUE to process another command.
637
Note: we cannot add for event processing because the whole request might
638
already be buffered and we wouldn't receive an event.
640
if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
643
thd->scheduler.thread_detach();
644
libevent_thd_add(thd);
650
Adds a THD to queued for libevent processing.
652
This call does not actually register the event with libevent.
653
Instead, it places the THD onto a queue and signals libevent by writing
654
a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
655
be invoked which will find the THD on the queue and add it to libevent.
658
static void libevent_thd_add(THD* thd)
661
pthread_mutex_lock(&LOCK_thd_add);
662
/* queue for libevent */
663
thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
664
/* notify libevent */
665
write(thd_add_pipe[1], &c, sizeof(c));
666
pthread_mutex_unlock(&LOCK_thd_add);
671
Wait until all pool threads have been deleted for clean shutdown
674
static void libevent_end()
676
DBUG_ENTER("libevent_end");
677
DBUG_PRINT("enter", ("created_threads: %d killed_threads: %u",
678
created_threads, killed_threads));
681
(void) pthread_mutex_lock(&LOCK_thread_count);
683
kill_pool_threads= TRUE;
684
while (killed_threads != created_threads)
686
/* wake up the event loop */
688
write(thd_add_pipe[1], &c, sizeof(c));
690
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
692
(void) pthread_mutex_unlock(&LOCK_thread_count);
694
event_del(&thd_add_event);
695
close(thd_add_pipe[0]);
696
close(thd_add_pipe[1]);
697
event_del(&thd_kill_event);
698
close(thd_kill_pipe[0]);
699
close(thd_kill_pipe[1]);
701
(void) pthread_mutex_destroy(&LOCK_event_loop);
702
(void) pthread_mutex_destroy(&LOCK_thd_add);
707
void pool_of_threads_scheduler(scheduler_functions* func)
709
func->max_threads= thread_pool_size;
710
func->init= libevent_init;
711
func->end= libevent_end;
712
func->post_kill_notification= libevent_post_kill_notification;
713
func->add_connection= libevent_add_connection;