1
/* Copyright (C) 2007 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
Implementation for the thread scheduler
20
#include <drizzled/server_includes.h>
21
#include <libdrizzle/libdrizzle.h>
23
#include <drizzled/gettext.h>
27
'Dummy' functions to be used when we don't need any handling for a scheduler
31
static bool init_dummy(void) {return 0;}
32
static void post_kill_dummy(Session *session __attribute__((unused))) {}
33
static void end_dummy(void) {}
34
static bool end_thread_dummy(Session *session __attribute__((unused)),
35
bool cache_thread __attribute__((unused)))
39
Initialize default scheduler with dummy functions so that setup functions
40
only need to declare those that are relvant for their usage
43
scheduler_functions::scheduler_functions()
45
init_new_connection_thread(init_new_connection_handler_thread),
46
add_connection(0), // Must be defined
47
post_kill_notification(post_kill_dummy),
48
end_thread(end_thread_dummy), end(end_dummy)
51
static uint32_t created_threads, killed_threads;
52
static bool kill_pool_threads;
54
static struct event session_add_event;
55
static struct event session_kill_event;
57
static pthread_mutex_t LOCK_session_add; /* protects sessions_need_adding */
58
static LIST *sessions_need_adding; /* list of sessions to add to libevent queue */
60
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
61
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
64
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
65
event_del) and sessions_need_processing and sessions_waiting_for_io.
67
static pthread_mutex_t LOCK_event_loop;
68
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
69
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
71
pthread_handler_t libevent_thread_proc(void *arg);
72
static void libevent_end();
73
static bool libevent_needs_immediate_processing(Session *session);
74
static void libevent_connection_close(Session *session);
75
static bool libevent_should_close_connection(Session* session);
76
static void libevent_session_add(Session* session);
77
void libevent_io_callback(int Fd, short Operation, void *ctx);
78
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
79
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
83
Create a pipe and set to non-blocking.
84
Returns true if there is an error.
87
static bool init_pipe(int pipe_fds[])
90
return pipe(pipe_fds) < 0 ||
91
(flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
92
fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
93
(flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
94
fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
99
session_scheduler keeps the link between Session and events.
100
It's embedded in the Session class.
103
session_scheduler::session_scheduler()
104
: logged_in(false), io_event(NULL), thread_attached(false)
109
session_scheduler::~session_scheduler()
115
session_scheduler::session_scheduler(const session_scheduler&)
116
: logged_in(false), io_event(NULL), thread_attached(false)
119
void session_scheduler::operator=(const session_scheduler&)
122
bool session_scheduler::init(Session *parent_session)
125
(struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
129
sql_print_error(_("Memory allocation error in session_scheduler::init\n"));
133
event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ,
134
libevent_io_callback, (void*)parent_session);
136
list.data= parent_session;
143
Attach/associate the connection with the OS thread, for command processing.
146
bool session_scheduler::thread_attach()
148
assert(!thread_attached);
149
Session* session = (Session*)list.data;
150
if (libevent_should_close_connection(session) ||
151
setup_connection_thread_globals(session))
156
session->mysys_var->abort= 0;
157
thread_attached= true;
164
Detach/disassociate the connection with the OS thread.
167
void session_scheduler::thread_detach()
171
Session* session = (Session*)list.data;
172
session->mysys_var= NULL;
173
thread_attached= false;
178
Create all threads for the thread pool
181
After threads are created we wait until all threads has signaled that
182
they have started before we return
186
1 We got an error creating the thread pool
187
In this case we will abort all created threads
190
static bool libevent_init(void)
198
kill_pool_threads= false;
200
pthread_mutex_init(&LOCK_event_loop, NULL);
201
pthread_mutex_init(&LOCK_session_add, NULL);
203
/* set up the pipe used to add new sessions to the event pool */
204
if (init_pipe(session_add_pipe))
206
sql_print_error(_("init_pipe(session_add_pipe) error in libevent_init\n"));
209
/* set up the pipe used to kill sessions in the event queue */
210
if (init_pipe(session_kill_pipe))
212
sql_print_error(_("init_pipe(session_kill_pipe) error in libevent_init\n"));
213
close(session_add_pipe[0]);
214
close(session_add_pipe[1]);
217
event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
218
libevent_add_session_callback, NULL);
219
event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
220
libevent_kill_session_callback, NULL);
222
if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
224
sql_print_error(_("session_add_event event_add error in libevent_init\n"));
229
/* Set up the thread pool */
230
created_threads= killed_threads= 0;
231
pthread_mutex_lock(&LOCK_thread_count);
233
for (i= 0; i < thread_pool_size; i++)
237
if ((error= pthread_create(&thread, &connection_attrib,
238
libevent_thread_proc, 0)))
240
sql_print_error(_("Can't create completion port thread (error %d)"),
242
pthread_mutex_unlock(&LOCK_thread_count);
243
libevent_end(); // Cleanup
248
/* Wait until all threads are created */
249
while (created_threads != thread_pool_size)
250
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
251
pthread_mutex_unlock(&LOCK_thread_count);
258
This is called when data is ready on the socket.
261
This is only called by the thread that owns LOCK_event_loop.
263
We add the session that got the data to sessions_need_processing, and
264
cause the libevent event_loop() to terminate. Then this same thread will
265
return from event_loop and pick the session value back up for processing.
268
void libevent_io_callback(int, short, void *ctx)
270
safe_mutex_assert_owner(&LOCK_event_loop);
271
Session *session= (Session*)ctx;
272
sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
273
sessions_need_processing= list_add(sessions_need_processing, &session->scheduler.list);
277
This is called when we have a thread we want to be killed.
280
This is only called by the thread that owns LOCK_event_loop.
283
void libevent_kill_session_callback(int Fd, short, void*)
285
safe_mutex_assert_owner(&LOCK_event_loop);
287
/* clear the pending events */
289
while (read(Fd, &c, sizeof(c)) == sizeof(c))
292
LIST* list= sessions_waiting_for_io;
295
Session *session= (Session*)list->data;
296
list= list_rest(list);
297
if (session->killed == Session::KILL_CONNECTION)
300
Delete from libevent and add to the processing queue.
302
event_del(session->scheduler.io_event);
303
sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
304
&session->scheduler.list);
305
sessions_need_processing= list_add(sessions_need_processing,
306
&session->scheduler.list);
313
This is used to add connections to the pool. This callback is invoked from
314
the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
318
This is only called by the thread that owns LOCK_event_loop.
321
void libevent_add_session_callback(int Fd, short, void *)
323
safe_mutex_assert_owner(&LOCK_event_loop);
325
/* clear the pending events */
327
while (read(Fd, &c, sizeof(c)) == sizeof(c))
330
pthread_mutex_lock(&LOCK_session_add);
331
while (sessions_need_adding)
333
/* pop the first session off the list */
334
Session* session= (Session*)sessions_need_adding->data;
335
sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
337
pthread_mutex_unlock(&LOCK_session_add);
339
if (!session->scheduler.logged_in || libevent_should_close_connection(session))
342
Add session to sessions_need_processing list. If it needs closing we'll close
343
it outside of event_loop().
345
sessions_need_processing= list_add(sessions_need_processing,
346
&session->scheduler.list);
350
/* Add to libevent */
351
if (event_add(session->scheduler.io_event, NULL))
353
sql_print_error(_("event_add error in libevent_add_session_callback\n"));
354
libevent_connection_close(session);
358
sessions_waiting_for_io= list_add(sessions_waiting_for_io,
359
&session->scheduler.list);
362
pthread_mutex_lock(&LOCK_session_add);
364
pthread_mutex_unlock(&LOCK_session_add);
369
Notify the thread pool about a new connection
372
LOCK_thread_count is locked on entry. This function MUST unlock it!
375
static void libevent_add_connection(Session *session)
377
if (session->scheduler.init(session))
379
sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
380
pthread_mutex_unlock(&LOCK_thread_count);
381
libevent_connection_close(session);
384
threads.append(session);
385
libevent_session_add(session);
387
pthread_mutex_unlock(&LOCK_thread_count);
393
@brief Signal a waiting connection it's time to die.
395
@details This function will signal libevent the Session should be killed.
396
Either the global LOCK_session_count or the Session's LOCK_delete must be locked
399
@param[in] session The connection to kill
402
static void libevent_post_kill_notification(Session *)
405
Note, we just wake up libevent with an event that a Session should be killed,
406
It will search its list of sessions for session->killed == KILL_CONNECTION to
407
find the Sessions it should kill.
409
So we don't actually tell it which one and we don't actually use the
410
Session being passed to us, but that's just a design detail that could change
414
assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
419
Close and delete a connection.
422
static void libevent_connection_close(Session *session)
424
session->killed= Session::KILL_CONNECTION; // Avoid error messages
426
if (net_get_sd(&(session->net)) >= 0) // not already closed
428
end_connection(session);
429
close_connection(session, 0, 1);
431
session->scheduler.thread_detach();
432
unlink_session(session); /* locks LOCK_thread_count and deletes session */
433
pthread_mutex_unlock(&LOCK_thread_count);
440
Returns true if we should close and delete a Session connection.
443
static bool libevent_should_close_connection(Session* session)
445
return net_should_close(&(session->net)) ||
446
session->killed == Session::KILL_CONNECTION;
451
libevent_thread_proc is the outer loop of each thread in the thread pool.
452
These procs only return/terminate on shutdown (kill_pool_threads == true).
455
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
457
if (init_new_connection_handler_thread())
459
my_thread_global_end();
460
sql_print_error(_("libevent_thread_proc: my_thread_init() failed\n"));
465
Signal libevent_init() when all threads has been created and are ready to
468
(void) pthread_mutex_lock(&LOCK_thread_count);
470
if (created_threads == thread_pool_size)
471
(void) pthread_cond_signal(&COND_thread_count);
472
(void) pthread_mutex_unlock(&LOCK_thread_count);
476
Session *session= NULL;
477
(void) pthread_mutex_lock(&LOCK_event_loop);
479
/* get session(s) to process */
480
while (!sessions_need_processing)
482
if (kill_pool_threads)
484
/* the flag that we should die has been set */
485
(void) pthread_mutex_unlock(&LOCK_event_loop);
488
event_loop(EVLOOP_ONCE);
491
/* pop the first session off the list */
492
session= (Session*)sessions_need_processing->data;
493
sessions_need_processing= list_delete(sessions_need_processing,
494
sessions_need_processing);
496
(void) pthread_mutex_unlock(&LOCK_event_loop);
498
/* now we process the connection (session) */
500
/* set up the session<->thread links. */
501
session->thread_stack= (char*) &session;
503
if (session->scheduler.thread_attach())
505
libevent_connection_close(session);
509
/* is the connection logged in yet? */
510
if (!session->scheduler.logged_in)
512
if (login_connection(session))
514
/* Failed to log in */
515
libevent_connection_close(session);
520
/* login successful */
521
session->scheduler.logged_in= true;
522
prepare_new_connection_state(session);
523
if (!libevent_needs_immediate_processing(session))
524
continue; /* New connection is now waiting for data in libevent*/
530
/* Process a query */
531
if (do_command(session))
533
libevent_connection_close(session);
536
} while (libevent_needs_immediate_processing(session));
540
(void) pthread_mutex_lock(&LOCK_thread_count);
542
pthread_cond_broadcast(&COND_thread_count);
543
(void) pthread_mutex_unlock(&LOCK_thread_count);
546
return(0); /* purify: deadcode */
551
Returns true if the connection needs immediate processing and false if
552
instead it's queued for libevent processing or closed,
555
static bool libevent_needs_immediate_processing(Session *session)
557
if (libevent_should_close_connection(session))
559
libevent_connection_close(session);
563
If more data in the socket buffer, return true to process another command.
565
Note: we cannot add for event processing because the whole request might
566
already be buffered and we wouldn't receive an event.
568
if (net_more_data(&(session->net)))
571
session->scheduler.thread_detach();
572
libevent_session_add(session);
578
Adds a Session to queued for libevent processing.
580
This call does not actually register the event with libevent.
581
Instead, it places the Session onto a queue and signals libevent by writing
582
a byte into session_add_pipe, which will cause our libevent_add_session_callback to
583
be invoked which will find the Session on the queue and add it to libevent.
586
static void libevent_session_add(Session* session)
589
pthread_mutex_lock(&LOCK_session_add);
590
/* queue for libevent */
591
sessions_need_adding= list_add(sessions_need_adding, &session->scheduler.list);
592
/* notify libevent */
593
assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
594
pthread_mutex_unlock(&LOCK_session_add);
599
Wait until all pool threads have been deleted for clean shutdown
602
static void libevent_end()
604
(void) pthread_mutex_lock(&LOCK_thread_count);
606
kill_pool_threads= true;
607
while (killed_threads != created_threads)
609
/* wake up the event loop */
611
assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
613
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
615
(void) pthread_mutex_unlock(&LOCK_thread_count);
617
event_del(&session_add_event);
618
close(session_add_pipe[0]);
619
close(session_add_pipe[1]);
620
event_del(&session_kill_event);
621
close(session_kill_pipe[0]);
622
close(session_kill_pipe[1]);
624
(void) pthread_mutex_destroy(&LOCK_event_loop);
625
(void) pthread_mutex_destroy(&LOCK_session_add);
630
void pool_of_threads_scheduler(scheduler_functions* func)
632
func->max_threads= thread_pool_size;
633
func->init= libevent_init;
634
func->end= libevent_end;
635
func->post_kill_notification= libevent_post_kill_notification;
636
func->add_connection= libevent_add_connection;