1
/* Copyright (C) 2006 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16
#include <drizzled/server_includes.h>
17
#include <drizzled/gettext.h>
18
#include <drizzled/error.h>
19
#include <drizzled/plugin_scheduling.h>
20
#include <drizzled/serialize/serialize.h>
21
#include <drizzled/connect.h>
22
#include <drizzled/sql_parse.h>
23
#include <drizzled/session.h>
24
#include "session_scheduler.h"
29
static uint32_t created_threads, killed_threads;
30
static bool kill_pool_threads;
32
static struct event session_add_event;
33
static struct event session_kill_event;
35
static pthread_mutex_t LOCK_session_add; /* protects sessions_need_adding */
36
static LIST *sessions_need_adding= NULL; /* list of sessions to add to libevent queue */
38
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
39
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
42
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
43
event_del) and sessions_need_processing and sessions_waiting_for_io.
45
static pthread_mutex_t LOCK_event_loop;
46
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
47
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
49
pthread_handler_t libevent_thread_proc(void *arg);
50
static void libevent_end();
51
static bool libevent_needs_immediate_processing(Session *session);
52
static void libevent_connection_close(Session *session);
53
void libevent_session_add(Session* session);
54
bool libevent_should_close_connection(Session* session);
55
void libevent_io_callback(int Fd, short Operation, void *ctx);
56
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
57
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
59
static uint32_t size= 0;
62
Create a pipe and set to non-blocking.
63
Returns true if there is an error.
66
static bool init_pipe(int pipe_fds[])
69
return pipe(pipe_fds) < 0 ||
70
(flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
71
fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
72
(flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
73
fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
79
Create all threads for the thread pool
82
After threads are created we wait until all threads has signaled that
83
they have started before we return
87
1 We got an error creating the thread pool
88
In this case we will abort all created threads
91
static bool libevent_init(void)
99
kill_pool_threads= false;
101
pthread_mutex_init(&LOCK_event_loop, NULL);
102
pthread_mutex_init(&LOCK_session_add, NULL);
104
/* set up the pipe used to add new sessions to the event pool */
105
if (init_pipe(session_add_pipe))
107
errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_add_pipe) error in libevent_init\n"));
110
/* set up the pipe used to kill sessions in the event queue */
111
if (init_pipe(session_kill_pipe))
113
errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_kill_pipe) error in libevent_init\n"));
114
close(session_add_pipe[0]);
115
close(session_add_pipe[1]);
118
event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
119
libevent_add_session_callback, NULL);
120
event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
121
libevent_kill_session_callback, NULL);
123
if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
125
errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
130
/* Set up the thread pool */
131
created_threads= killed_threads= 0;
132
pthread_mutex_lock(&LOCK_thread_count);
134
for (i= 0; i < thread_pool_size; i++)
138
if ((error= pthread_create(&thread, &connection_attrib,
139
libevent_thread_proc, 0)))
141
errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
143
pthread_mutex_unlock(&LOCK_thread_count);
144
libevent_end(); // Cleanup
149
/* Wait until all threads are created */
150
while (created_threads != thread_pool_size)
151
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
152
pthread_mutex_unlock(&LOCK_thread_count);
159
This is called when data is ready on the socket.
162
This is only called by the thread that owns LOCK_event_loop.
164
We add the session that got the data to sessions_need_processing, and
165
cause the libevent event_loop() to terminate. Then this same thread will
166
return from event_loop and pick the session value back up for processing.
169
void libevent_io_callback(int, short, void *ctx)
171
safe_mutex_assert_owner(&LOCK_event_loop);
172
Session *session= (Session*)ctx;
173
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
175
sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &scheduler->list);
176
sessions_need_processing= list_add(sessions_need_processing, &scheduler->list);
180
This is called when we have a thread we want to be killed.
183
This is only called by the thread that owns LOCK_event_loop.
186
void libevent_kill_session_callback(int Fd, short, void*)
188
safe_mutex_assert_owner(&LOCK_event_loop);
190
/* clear the pending events */
192
while (read(Fd, &c, sizeof(c)) == sizeof(c))
195
LIST* list= sessions_waiting_for_io;
198
Session *session= (Session*)list->data;
199
list= list_rest(list);
200
if (session->killed == Session::KILL_CONNECTION)
202
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
205
Delete from libevent and add to the processing queue.
207
event_del(scheduler->io_event);
208
sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
210
sessions_need_processing= list_add(sessions_need_processing,
218
This is used to add connections to the pool. This callback is invoked from
219
the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
223
This is only called by the thread that owns LOCK_event_loop.
226
void libevent_add_session_callback(int Fd, short, void *)
228
safe_mutex_assert_owner(&LOCK_event_loop);
230
/* clear the pending events */
232
while (read(Fd, &c, sizeof(c)) == sizeof(c))
235
pthread_mutex_lock(&LOCK_session_add);
236
while (sessions_need_adding)
238
/* pop the first session off the list */
239
Session* session= (Session*)sessions_need_adding->data;
240
sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
241
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
244
pthread_mutex_unlock(&LOCK_session_add);
246
if (!scheduler->logged_in || libevent_should_close_connection(session))
249
Add session to sessions_need_processing list. If it needs closing we'll close
250
it outside of event_loop().
252
sessions_need_processing= list_add(sessions_need_processing,
257
/* Add to libevent */
258
if (event_add(scheduler->io_event, NULL))
260
errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
261
libevent_connection_close(session);
265
sessions_waiting_for_io= list_add(sessions_waiting_for_io,
269
pthread_mutex_lock(&LOCK_session_add);
271
pthread_mutex_unlock(&LOCK_session_add);
276
Notify the thread pool about a new connection
279
LOCK_thread_count is locked on entry. This function MUST unlock it!
282
static void libevent_add_connection(Session *session)
284
assert(session->scheduler == NULL);
285
session_scheduler *scheduler= new session_scheduler;
287
session->scheduler= (void *)scheduler;
289
if (scheduler->init(session))
291
errmsg_printf(ERRMSG_LVL_ERROR, _("Scheduler init error in libevent_add_new_connection\n"));
292
pthread_mutex_unlock(&LOCK_thread_count);
293
libevent_connection_close(session);
297
threads.append(session);
298
libevent_session_add(session);
300
pthread_mutex_unlock(&LOCK_thread_count);
306
@brief Signal a waiting connection it's time to die.
308
@details This function will signal libevent the Session should be killed.
309
Either the global LOCK_session_count or the Session's LOCK_delete must be locked
312
@param[in] session The connection to kill
315
static void libevent_post_kill_notification(Session *)
318
Note, we just wake up libevent with an event that a Session should be killed,
319
It will search its list of sessions for session->killed == KILL_CONNECTION to
320
find the Sessions it should kill.
322
So we don't actually tell it which one and we don't actually use the
323
Session being passed to us, but that's just a design detail that could change
327
assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
332
Close and delete a connection.
335
static void libevent_connection_close(Session *session)
337
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
339
session->killed= Session::KILL_CONNECTION; // Avoid error messages
341
if (net_get_sd(&(session->net)) >= 0) // not already closed
343
end_connection(session);
344
session->close_connection(0, 1);
346
scheduler->thread_detach();
349
session->scheduler= NULL;
351
unlink_session(session); /* locks LOCK_thread_count and deletes session */
352
pthread_mutex_unlock(&LOCK_thread_count);
359
Returns true if we should close and delete a Session connection.
362
bool libevent_should_close_connection(Session* session)
364
return net_should_close(&(session->net)) ||
365
session->killed == Session::KILL_CONNECTION;
370
libevent_thread_proc is the outer loop of each thread in the thread pool.
371
These procs only return/terminate on shutdown (kill_pool_threads == true).
374
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
376
if (init_new_connection_handler_thread())
378
my_thread_global_end();
379
errmsg_printf(ERRMSG_LVL_ERROR, _("libevent_thread_proc: my_thread_init() failed\n"));
384
Signal libevent_init() when all threads has been created and are ready to
387
(void) pthread_mutex_lock(&LOCK_thread_count);
389
if (created_threads == thread_pool_size)
390
(void) pthread_cond_signal(&COND_thread_count);
391
(void) pthread_mutex_unlock(&LOCK_thread_count);
395
Session *session= NULL;
396
(void) pthread_mutex_lock(&LOCK_event_loop);
398
/* get session(s) to process */
399
while (!sessions_need_processing)
401
if (kill_pool_threads)
403
/* the flag that we should die has been set */
404
(void) pthread_mutex_unlock(&LOCK_event_loop);
407
event_loop(EVLOOP_ONCE);
410
/* pop the first session off the list */
411
session= (Session*)sessions_need_processing->data;
412
sessions_need_processing= list_delete(sessions_need_processing,
413
sessions_need_processing);
414
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
416
(void) pthread_mutex_unlock(&LOCK_event_loop);
418
/* now we process the connection (session) */
420
/* set up the session<->thread links. */
421
session->thread_stack= (char*) &session;
423
if (scheduler->thread_attach())
425
libevent_connection_close(session);
429
/* is the connection logged in yet? */
430
if (!scheduler->logged_in)
432
if (login_connection(session))
434
/* Failed to log in */
435
libevent_connection_close(session);
440
/* login successful */
441
scheduler->logged_in= true;
442
prepare_new_connection_state(session);
443
if (!libevent_needs_immediate_processing(session))
444
continue; /* New connection is now waiting for data in libevent*/
450
/* Process a query */
451
if (do_command(session))
453
libevent_connection_close(session);
456
} while (libevent_needs_immediate_processing(session));
460
(void) pthread_mutex_lock(&LOCK_thread_count);
462
pthread_cond_broadcast(&COND_thread_count);
463
(void) pthread_mutex_unlock(&LOCK_thread_count);
466
return(0); /* purify: deadcode */
471
Returns true if the connection needs immediate processing and false if
472
instead it's queued for libevent processing or closed,
475
static bool libevent_needs_immediate_processing(Session *session)
477
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
479
if (libevent_should_close_connection(session))
481
libevent_connection_close(session);
485
If more data in the socket buffer, return true to process another command.
487
Note: we cannot add for event processing because the whole request might
488
already be buffered and we wouldn't receive an event.
490
if (net_more_data(&(session->net)))
493
scheduler->thread_detach();
494
libevent_session_add(session);
501
Adds a Session to queued for libevent processing.
503
This call does not actually register the event with libevent.
504
Instead, it places the Session onto a queue and signals libevent by writing
505
a byte into session_add_pipe, which will cause our libevent_add_session_callback to
506
be invoked which will find the Session on the queue and add it to libevent.
509
void libevent_session_add(Session* session)
512
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
515
pthread_mutex_lock(&LOCK_session_add);
516
/* queue for libevent */
517
sessions_need_adding= list_add(sessions_need_adding, &scheduler->list);
518
/* notify libevent */
519
assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
520
pthread_mutex_unlock(&LOCK_session_add);
525
Wait until all pool threads have been deleted for clean shutdown
528
static void libevent_end()
530
(void) pthread_mutex_lock(&LOCK_thread_count);
532
kill_pool_threads= true;
533
while (killed_threads != created_threads)
535
/* wake up the event loop */
537
assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
539
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
541
(void) pthread_mutex_unlock(&LOCK_thread_count);
543
event_del(&session_add_event);
544
close(session_add_pipe[0]);
545
close(session_add_pipe[1]);
546
event_del(&session_kill_event);
547
close(session_kill_pipe[0]);
548
close(session_kill_pipe[1]);
550
(void) pthread_mutex_destroy(&LOCK_event_loop);
551
(void) pthread_mutex_destroy(&LOCK_session_add);
555
static int init(void *p)
557
scheduler_functions* func= (scheduler_functions *)p;
560
func->max_threads= size;
561
func->init= libevent_init;
562
func->end= libevent_end;
563
func->post_kill_notification= libevent_post_kill_notification;
564
func->add_connection= libevent_add_connection;
569
static int deinit(void *)
575
The defaults here were picked based on what I see (aka Brian). They should
576
be vetted across a larger audience.
578
static DRIZZLE_SYSVAR_UINT(size, size,
581
NULL, NULL, 8, 1, 1024, 0);
583
static struct st_mysql_sys_var* system_variables[]= {
584
DRIZZLE_SYSVAR(size),
588
mysql_declare_plugin(pool_of_threads)
590
DRIZZLE_SCHEDULING_PLUGIN,
594
"Pool of Threads Scheduler",
596
init, /* Plugin Init */
597
deinit, /* Plugin Deinit */
598
NULL, /* status variables */
599
system_variables, /* system variables */
600
NULL /* config options */
602
mysql_declare_plugin_end;