1
/* Copyright (C) 2007 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
Implementation for the thread scheduler
20
#include <drizzled/server_includes.h>
25
'Dummy' functions to be used when we don't need any handling for a scheduler
29
static bool init_dummy(void) {return 0;}
30
static void post_kill_dummy(THD *thd __attribute__((unused))) {}
31
static void end_dummy(void) {}
32
static bool end_thread_dummy(THD *thd __attribute__((unused)),
33
bool cache_thread __attribute__((unused)))
37
Initialize default scheduler with dummy functions so that setup functions
38
only need to declare those that are relvant for their usage
41
scheduler_functions::scheduler_functions()
43
init_new_connection_thread(init_new_connection_handler_thread),
44
add_connection(0), // Must be defined
45
post_kill_notification(post_kill_dummy),
46
end_thread(end_thread_dummy), end(end_dummy)
49
static uint created_threads, killed_threads;
50
static bool kill_pool_threads;
52
static struct event thd_add_event;
53
static struct event thd_kill_event;
55
static pthread_mutex_t LOCK_thd_add; /* protects thds_need_adding */
56
static LIST *thds_need_adding; /* list of thds to add to libevent queue */
58
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
59
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
62
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
63
event_del) and thds_need_processing and thds_waiting_for_io.
65
static pthread_mutex_t LOCK_event_loop;
66
static LIST *thds_need_processing; /* list of thds that needs some processing */
67
static LIST *thds_waiting_for_io; /* list of thds with added events */
69
pthread_handler_t libevent_thread_proc(void *arg);
70
static void libevent_end();
71
static bool libevent_needs_immediate_processing(THD *thd);
72
static void libevent_connection_close(THD *thd);
73
static bool libevent_should_close_connection(THD* thd);
74
static void libevent_thd_add(THD* thd);
75
void libevent_io_callback(int Fd, short Operation, void *ctx);
76
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
77
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
81
Create a pipe and set to non-blocking.
82
Returns true if there is an error.
85
static bool init_pipe(int pipe_fds[])
88
return pipe(pipe_fds) < 0 ||
89
(flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
90
fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
91
(flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
92
fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
97
thd_scheduler keeps the link between THD and events.
98
It's embedded in the THD class.
101
thd_scheduler::thd_scheduler()
102
: logged_in(false), io_event(NULL), thread_attached(false)
104
dbug_explain_buf[0]= 0;
108
thd_scheduler::~thd_scheduler()
110
my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
114
bool thd_scheduler::init(THD *parent_thd)
117
(struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
121
sql_print_error("Memory allocation error in thd_scheduler::init\n");
125
event_set(io_event, parent_thd->net.vio->sd, EV_READ,
126
libevent_io_callback, (void*)parent_thd);
128
list.data= parent_thd;
135
Attach/associate the connection with the OS thread, for command processing.
138
bool thd_scheduler::thread_attach()
140
assert(!thread_attached);
141
THD* thd = (THD*)list.data;
142
if (libevent_should_close_connection(thd) ||
143
setup_connection_thread_globals(thd))
148
thd->mysys_var->abort= 0;
149
thread_attached= true;
156
Detach/disassociate the connection with the OS thread.
159
void thd_scheduler::thread_detach()
163
THD* thd = (THD*)list.data;
164
thd->mysys_var= NULL;
165
thread_attached= false;
172
Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
174
This is used to preserve the SESSION DEBUG variable, which is mapped to the OS
175
thread during a command, but each command is handled by a different thread.
177
void thd_scheduler::swap_dbug_explain()
179
char buffer[sizeof(dbug_explain_buf)];
180
memcpy(dbug_explain_buf, buffer, sizeof(buffer));
184
Create all threads for the thread pool
187
After threads are created we wait until all threads has signaled that
188
they have started before we return
192
1 We got an error creating the thread pool
193
In this case we will abort all created threads
196
static bool libevent_init(void)
204
kill_pool_threads= false;
206
pthread_mutex_init(&LOCK_event_loop, NULL);
207
pthread_mutex_init(&LOCK_thd_add, NULL);
209
/* set up the pipe used to add new thds to the event pool */
210
if (init_pipe(thd_add_pipe))
212
sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
215
/* set up the pipe used to kill thds in the event queue */
216
if (init_pipe(thd_kill_pipe))
218
sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
219
close(thd_add_pipe[0]);
220
close(thd_add_pipe[1]);
223
event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
224
libevent_add_thd_callback, NULL);
225
event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
226
libevent_kill_thd_callback, NULL);
228
if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
230
sql_print_error("thd_add_event event_add error in libevent_init\n");
235
/* Set up the thread pool */
236
created_threads= killed_threads= 0;
237
pthread_mutex_lock(&LOCK_thread_count);
239
for (i= 0; i < thread_pool_size; i++)
243
if ((error= pthread_create(&thread, &connection_attrib,
244
libevent_thread_proc, 0)))
246
sql_print_error("Can't create completion port thread (error %d)",
248
pthread_mutex_unlock(&LOCK_thread_count);
249
libevent_end(); // Cleanup
254
/* Wait until all threads are created */
255
while (created_threads != thread_pool_size)
256
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
257
pthread_mutex_unlock(&LOCK_thread_count);
264
This is called when data is ready on the socket.
267
This is only called by the thread that owns LOCK_event_loop.
269
We add the thd that got the data to thds_need_processing, and
270
cause the libevent event_loop() to terminate. Then this same thread will
271
return from event_loop and pick the thd value back up for processing.
274
void libevent_io_callback(int, short, void *ctx)
276
safe_mutex_assert_owner(&LOCK_event_loop);
278
thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
279
thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
283
This is called when we have a thread we want to be killed.
286
This is only called by the thread that owns LOCK_event_loop.
289
void libevent_kill_thd_callback(int Fd, short, void*)
291
safe_mutex_assert_owner(&LOCK_event_loop);
293
/* clear the pending events */
295
while (read(Fd, &c, sizeof(c)) == sizeof(c))
298
LIST* list= thds_waiting_for_io;
301
THD *thd= (THD*)list->data;
302
list= list_rest(list);
303
if (thd->killed == THD::KILL_CONNECTION)
306
Delete from libevent and add to the processing queue.
308
event_del(thd->scheduler.io_event);
309
thds_waiting_for_io= list_delete(thds_waiting_for_io,
310
&thd->scheduler.list);
311
thds_need_processing= list_add(thds_need_processing,
312
&thd->scheduler.list);
319
This is used to add connections to the pool. This callback is invoked from
320
the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
324
This is only called by the thread that owns LOCK_event_loop.
327
void libevent_add_thd_callback(int Fd, short, void *)
329
safe_mutex_assert_owner(&LOCK_event_loop);
331
/* clear the pending events */
333
while (read(Fd, &c, sizeof(c)) == sizeof(c))
336
pthread_mutex_lock(&LOCK_thd_add);
337
while (thds_need_adding)
339
/* pop the first thd off the list */
340
THD* thd= (THD*)thds_need_adding->data;
341
thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
343
pthread_mutex_unlock(&LOCK_thd_add);
345
if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
348
Add thd to thds_need_processing list. If it needs closing we'll close
349
it outside of event_loop().
351
thds_need_processing= list_add(thds_need_processing,
352
&thd->scheduler.list);
356
/* Add to libevent */
357
if (event_add(thd->scheduler.io_event, NULL))
359
sql_print_error("event_add error in libevent_add_thd_callback\n");
360
libevent_connection_close(thd);
364
thds_waiting_for_io= list_add(thds_waiting_for_io,
365
&thd->scheduler.list);
368
pthread_mutex_lock(&LOCK_thd_add);
370
pthread_mutex_unlock(&LOCK_thd_add);
375
Notify the thread pool about a new connection
378
LOCK_thread_count is locked on entry. This function MUST unlock it!
381
static void libevent_add_connection(THD *thd)
383
if (thd->scheduler.init(thd))
385
sql_print_error("Scheduler init error in libevent_add_new_connection\n");
386
pthread_mutex_unlock(&LOCK_thread_count);
387
libevent_connection_close(thd);
391
libevent_thd_add(thd);
393
pthread_mutex_unlock(&LOCK_thread_count);
399
@brief Signal a waiting connection it's time to die.
401
@details This function will signal libevent the THD should be killed.
402
Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
405
@param[in] thd The connection to kill
408
static void libevent_post_kill_notification(THD *)
411
Note, we just wake up libevent with an event that a THD should be killed,
412
It will search its list of thds for thd->killed == KILL_CONNECTION to
413
find the THDs it should kill.
415
So we don't actually tell it which one and we don't actually use the
416
THD being passed to us, but that's just a design detail that could change
420
write(thd_kill_pipe[1], &c, sizeof(c));
425
Close and delete a connection.
428
static void libevent_connection_close(THD *thd)
430
thd->killed= THD::KILL_CONNECTION; // Avoid error messages
432
if (thd->net.vio->sd >= 0) // not already closed
435
close_connection(thd, 0, 1);
437
thd->scheduler.thread_detach();
438
unlink_thd(thd); /* locks LOCK_thread_count and deletes thd */
439
pthread_mutex_unlock(&LOCK_thread_count);
446
Returns true if we should close and delete a THD connection.
449
static bool libevent_should_close_connection(THD* thd)
451
return thd->net.error ||
453
thd->killed == THD::KILL_CONNECTION;
458
libevent_thread_proc is the outer loop of each thread in the thread pool.
459
These procs only return/terminate on shutdown (kill_pool_threads == true).
462
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
464
if (init_new_connection_handler_thread())
466
my_thread_global_end();
467
sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
472
Signal libevent_init() when all threads has been created and are ready to
475
(void) pthread_mutex_lock(&LOCK_thread_count);
477
if (created_threads == thread_pool_size)
478
(void) pthread_cond_signal(&COND_thread_count);
479
(void) pthread_mutex_unlock(&LOCK_thread_count);
484
(void) pthread_mutex_lock(&LOCK_event_loop);
486
/* get thd(s) to process */
487
while (!thds_need_processing)
489
if (kill_pool_threads)
491
/* the flag that we should die has been set */
492
(void) pthread_mutex_unlock(&LOCK_event_loop);
495
event_loop(EVLOOP_ONCE);
498
/* pop the first thd off the list */
499
thd= (THD*)thds_need_processing->data;
500
thds_need_processing= list_delete(thds_need_processing,
501
thds_need_processing);
503
(void) pthread_mutex_unlock(&LOCK_event_loop);
505
/* now we process the connection (thd) */
507
/* set up the thd<->thread links. */
508
thd->thread_stack= (char*) &thd;
510
if (thd->scheduler.thread_attach())
512
libevent_connection_close(thd);
516
/* is the connection logged in yet? */
517
if (!thd->scheduler.logged_in)
519
if (login_connection(thd))
521
/* Failed to log in */
522
libevent_connection_close(thd);
527
/* login successful */
528
thd->scheduler.logged_in= true;
529
prepare_new_connection_state(thd);
530
if (!libevent_needs_immediate_processing(thd))
531
continue; /* New connection is now waiting for data in libevent*/
537
/* Process a query */
540
libevent_connection_close(thd);
543
} while (libevent_needs_immediate_processing(thd));
547
(void) pthread_mutex_lock(&LOCK_thread_count);
549
pthread_cond_broadcast(&COND_thread_count);
550
(void) pthread_mutex_unlock(&LOCK_thread_count);
553
return(0); /* purify: deadcode */
558
Returns true if the connection needs immediate processing and false if
559
instead it's queued for libevent processing or closed,
562
static bool libevent_needs_immediate_processing(THD *thd)
564
if (libevent_should_close_connection(thd))
566
libevent_connection_close(thd);
570
If more data in the socket buffer, return true to process another command.
572
Note: we cannot add for event processing because the whole request might
573
already be buffered and we wouldn't receive an event.
575
if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
578
thd->scheduler.thread_detach();
579
libevent_thd_add(thd);
585
Adds a THD to queued for libevent processing.
587
This call does not actually register the event with libevent.
588
Instead, it places the THD onto a queue and signals libevent by writing
589
a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
590
be invoked which will find the THD on the queue and add it to libevent.
593
static void libevent_thd_add(THD* thd)
596
pthread_mutex_lock(&LOCK_thd_add);
597
/* queue for libevent */
598
thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
599
/* notify libevent */
600
write(thd_add_pipe[1], &c, sizeof(c));
601
pthread_mutex_unlock(&LOCK_thd_add);
606
Wait until all pool threads have been deleted for clean shutdown
609
static void libevent_end()
611
(void) pthread_mutex_lock(&LOCK_thread_count);
613
kill_pool_threads= true;
614
while (killed_threads != created_threads)
616
/* wake up the event loop */
618
write(thd_add_pipe[1], &c, sizeof(c));
620
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
622
(void) pthread_mutex_unlock(&LOCK_thread_count);
624
event_del(&thd_add_event);
625
close(thd_add_pipe[0]);
626
close(thd_add_pipe[1]);
627
event_del(&thd_kill_event);
628
close(thd_kill_pipe[0]);
629
close(thd_kill_pipe[1]);
631
(void) pthread_mutex_destroy(&LOCK_event_loop);
632
(void) pthread_mutex_destroy(&LOCK_thd_add);
637
void pool_of_threads_scheduler(scheduler_functions* func)
639
func->max_threads= thread_pool_size;
640
func->init= libevent_init;
641
func->end= libevent_end;
642
func->post_kill_notification= libevent_post_kill_notification;
643
func->add_connection= libevent_add_connection;