20
20
#include <drizzled/server_includes.h>
21
21
#include <libdrizzle/libdrizzle.h>
23
#include <drizzled/gettext.h>
24
#include <drizzled/sql_parse.h>
25
#include <drizzled/scheduler.h>
26
#include <drizzled/session.h>
27
/* API for connecting, logging in to a drizzled server */
28
#include <drizzled/connect.h>
26
31
'Dummy' functions to be used when we don't need any handling for a scheduler
30
35
static bool init_dummy(void) {return 0;}
31
static void post_kill_dummy(THD *thd __attribute__((unused))) {}
36
static void post_kill_dummy(Session *session __attribute__((unused))) {}
32
37
static void end_dummy(void) {}
33
static bool end_thread_dummy(THD *thd __attribute__((unused)),
38
static bool end_thread_dummy(Session *session __attribute__((unused)),
34
39
bool cache_thread __attribute__((unused)))
50
55
static uint32_t created_threads, killed_threads;
51
56
static bool kill_pool_threads;
53
static struct event thd_add_event;
54
static struct event thd_kill_event;
56
static pthread_mutex_t LOCK_thd_add; /* protects thds_need_adding */
57
static LIST *thds_need_adding; /* list of thds to add to libevent queue */
59
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
60
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
58
static struct event session_add_event;
59
static struct event session_kill_event;
61
static pthread_mutex_t LOCK_session_add; /* protects sessions_need_adding */
62
static LIST *sessions_need_adding; /* list of sessions to add to libevent queue */
64
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
65
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
63
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
64
event_del) and thds_need_processing and thds_waiting_for_io.
68
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
69
event_del) and sessions_need_processing and sessions_waiting_for_io.
66
71
static pthread_mutex_t LOCK_event_loop;
67
static LIST *thds_need_processing; /* list of thds that needs some processing */
68
static LIST *thds_waiting_for_io; /* list of thds with added events */
72
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
73
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
70
75
pthread_handler_t libevent_thread_proc(void *arg);
71
76
static void libevent_end();
72
static bool libevent_needs_immediate_processing(THD *thd);
73
static void libevent_connection_close(THD *thd);
74
static bool libevent_should_close_connection(THD* thd);
75
static void libevent_thd_add(THD* thd);
77
static bool libevent_needs_immediate_processing(Session *session);
78
static void libevent_connection_close(Session *session);
79
static bool libevent_should_close_connection(Session* session);
80
static void libevent_session_add(Session* session);
76
81
void libevent_io_callback(int Fd, short Operation, void *ctx);
77
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
78
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
82
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
83
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
98
thd_scheduler keeps the link between THD and events.
99
It's embedded in the THD class.
103
session_scheduler keeps the link between Session and events.
104
It's embedded in the Session class.
102
thd_scheduler::thd_scheduler()
103
: logged_in(false), io_event(NULL), thread_attached(false)
105
dbug_explain_buf[0]= 0;
109
thd_scheduler::~thd_scheduler()
115
bool thd_scheduler::init(THD *parent_thd)
118
(struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
107
session_scheduler::session_scheduler()
108
: logged_in(false), io_event(NULL), thread_attached(false)
113
session_scheduler::~session_scheduler()
119
session_scheduler::session_scheduler(const session_scheduler&)
120
: logged_in(false), io_event(NULL), thread_attached(false)
123
void session_scheduler::operator=(const session_scheduler&)
126
bool session_scheduler::init(Session *parent_session)
128
io_event= new struct event;
130
if (io_event == NULL)
122
sql_print_error(_("Memory allocation error in thd_scheduler::init\n"));
132
errmsg_printf(ERRMSG_LVL_ERROR, _("Memory allocation error in session_scheduler::init\n"));
126
event_set(io_event, net_get_sd(&(parent_thd->net)), EV_READ,
127
libevent_io_callback, (void*)parent_thd);
129
list.data= parent_thd;
135
memset(io_event, 0, sizeof(*io_event));
137
event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ,
138
libevent_io_callback, (void*)parent_session);
140
list.data= parent_session;
136
147
Attach/associate the connection with the OS thread, for command processing.
139
bool thd_scheduler::thread_attach()
150
bool session_scheduler::thread_attach()
141
152
assert(!thread_attached);
142
THD* thd = (THD*)list.data;
143
if (libevent_should_close_connection(thd) ||
144
setup_connection_thread_globals(thd))
153
Session* session = (Session*)list.data;
154
if (libevent_should_close_connection(session) ||
155
setup_connection_thread_globals(session))
149
thd->mysys_var->abort= 0;
160
session->mysys_var->abort= 0;
150
161
thread_attached= true;
157
168
Detach/disassociate the connection with the OS thread.
160
void thd_scheduler::thread_detach()
171
void session_scheduler::thread_detach()
162
173
if (thread_attached)
164
THD* thd = (THD*)list.data;
165
thd->mysys_var= NULL;
175
Session* session = (Session*)list.data;
176
session->mysys_var= NULL;
166
177
thread_attached= false;
173
Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
175
This is used to preserve the SESSION DEBUG variable, which is mapped to the OS
176
thread during a command, but each command is handled by a different thread.
178
void thd_scheduler::swap_dbug_explain()
180
char buffer[sizeof(dbug_explain_buf)];
181
memcpy(dbug_explain_buf, buffer, sizeof(buffer));
185
182
Create all threads for the thread pool
203
200
created_threads= 0;
204
201
killed_threads= 0;
205
202
kill_pool_threads= false;
207
204
pthread_mutex_init(&LOCK_event_loop, NULL);
208
pthread_mutex_init(&LOCK_thd_add, NULL);
210
/* set up the pipe used to add new thds to the event pool */
211
if (init_pipe(thd_add_pipe))
213
sql_print_error(_("init_pipe(thd_add_pipe) error in libevent_init\n"));
216
/* set up the pipe used to kill thds in the event queue */
217
if (init_pipe(thd_kill_pipe))
219
sql_print_error(_("init_pipe(thd_kill_pipe) error in libevent_init\n"));
220
close(thd_add_pipe[0]);
221
close(thd_add_pipe[1]);
224
event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
225
libevent_add_thd_callback, NULL);
226
event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
227
libevent_kill_thd_callback, NULL);
229
if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
205
pthread_mutex_init(&LOCK_session_add, NULL);
207
/* set up the pipe used to add new sessions to the event pool */
208
if (init_pipe(session_add_pipe))
210
errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_add_pipe) error in libevent_init\n"));
213
/* set up the pipe used to kill sessions in the event queue */
214
if (init_pipe(session_kill_pipe))
216
errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_kill_pipe) error in libevent_init\n"));
217
close(session_add_pipe[0]);
218
close(session_add_pipe[1]);
221
event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
222
libevent_add_session_callback, NULL);
223
event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
224
libevent_kill_session_callback, NULL);
226
if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
231
sql_print_error(_("thd_add_event event_add error in libevent_init\n"));
228
errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
236
233
/* Set up the thread pool */
237
234
created_threads= killed_threads= 0;
244
241
if ((error= pthread_create(&thread, &connection_attrib,
245
242
libevent_thread_proc, 0)))
247
sql_print_error(_("Can't create completion port thread (error %d)"),
244
errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
249
246
pthread_mutex_unlock(&LOCK_thread_count);
250
247
libevent_end(); // Cleanup
256
253
while (created_threads != thread_pool_size)
257
254
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
258
255
pthread_mutex_unlock(&LOCK_thread_count);
265
262
This is called when data is ready on the socket.
268
265
This is only called by the thread that owns LOCK_event_loop.
270
We add the thd that got the data to thds_need_processing, and
267
We add the session that got the data to sessions_need_processing, and
271
268
cause the libevent event_loop() to terminate. Then this same thread will
272
return from event_loop and pick the thd value back up for processing.
269
return from event_loop and pick the session value back up for processing.
275
272
void libevent_io_callback(int, short, void *ctx)
277
274
safe_mutex_assert_owner(&LOCK_event_loop);
279
thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
280
thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
275
Session *session= (Session*)ctx;
276
sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
277
sessions_need_processing= list_add(sessions_need_processing, &session->scheduler.list);
284
281
This is called when we have a thread we want to be killed.
287
284
This is only called by the thread that owns LOCK_event_loop.
290
void libevent_kill_thd_callback(int Fd, short, void*)
287
void libevent_kill_session_callback(int Fd, short, void*)
292
289
safe_mutex_assert_owner(&LOCK_event_loop);
294
291
/* clear the pending events */
296
293
while (read(Fd, &c, sizeof(c)) == sizeof(c))
299
LIST* list= thds_waiting_for_io;
296
LIST* list= sessions_waiting_for_io;
302
THD *thd= (THD*)list->data;
299
Session *session= (Session*)list->data;
303
300
list= list_rest(list);
304
if (thd->killed == THD::KILL_CONNECTION)
301
if (session->killed == Session::KILL_CONNECTION)
307
304
Delete from libevent and add to the processing queue.
309
event_del(thd->scheduler.io_event);
310
thds_waiting_for_io= list_delete(thds_waiting_for_io,
311
&thd->scheduler.list);
312
thds_need_processing= list_add(thds_need_processing,
313
&thd->scheduler.list);
306
event_del(session->scheduler.io_event);
307
sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
308
&session->scheduler.list);
309
sessions_need_processing= list_add(sessions_need_processing,
310
&session->scheduler.list);
320
317
This is used to add connections to the pool. This callback is invoked from
321
the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
318
the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
325
322
This is only called by the thread that owns LOCK_event_loop.
328
void libevent_add_thd_callback(int Fd, short, void *)
325
void libevent_add_session_callback(int Fd, short, void *)
330
327
safe_mutex_assert_owner(&LOCK_event_loop);
332
329
/* clear the pending events */
334
331
while (read(Fd, &c, sizeof(c)) == sizeof(c))
337
pthread_mutex_lock(&LOCK_thd_add);
338
while (thds_need_adding)
334
pthread_mutex_lock(&LOCK_session_add);
335
while (sessions_need_adding)
340
/* pop the first thd off the list */
341
THD* thd= (THD*)thds_need_adding->data;
342
thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
344
pthread_mutex_unlock(&LOCK_thd_add);
346
if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
337
/* pop the first session off the list */
338
Session* session= (Session*)sessions_need_adding->data;
339
sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
341
pthread_mutex_unlock(&LOCK_session_add);
343
if (!session->scheduler.logged_in || libevent_should_close_connection(session))
349
Add thd to thds_need_processing list. If it needs closing we'll close
346
Add session to sessions_need_processing list. If it needs closing we'll close
350
347
it outside of event_loop().
352
thds_need_processing= list_add(thds_need_processing,
353
&thd->scheduler.list);
349
sessions_need_processing= list_add(sessions_need_processing,
350
&session->scheduler.list);
357
354
/* Add to libevent */
358
if (event_add(thd->scheduler.io_event, NULL))
355
if (event_add(session->scheduler.io_event, NULL))
360
sql_print_error(_("event_add error in libevent_add_thd_callback\n"));
361
libevent_connection_close(thd);
357
errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
358
libevent_connection_close(session);
365
thds_waiting_for_io= list_add(thds_waiting_for_io,
366
&thd->scheduler.list);
362
sessions_waiting_for_io= list_add(sessions_waiting_for_io,
363
&session->scheduler.list);
369
pthread_mutex_lock(&LOCK_thd_add);
366
pthread_mutex_lock(&LOCK_session_add);
371
pthread_mutex_unlock(&LOCK_thd_add);
368
pthread_mutex_unlock(&LOCK_session_add);
379
376
LOCK_thread_count is locked on entry. This function MUST unlock it!
382
static void libevent_add_connection(THD *thd)
379
static void libevent_add_connection(Session *session)
384
if (thd->scheduler.init(thd))
381
if (session->scheduler.init(session))
386
sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
383
errmsg_printf(ERRMSG_LVL_ERROR, _("Scheduler init error in libevent_add_new_connection\n"));
387
384
pthread_mutex_unlock(&LOCK_thread_count);
388
libevent_connection_close(thd);
385
libevent_connection_close(session);
392
libevent_thd_add(thd);
388
threads.append(session);
389
libevent_session_add(session);
394
391
pthread_mutex_unlock(&LOCK_thread_count);
400
397
@brief Signal a waiting connection it's time to die.
402
@details This function will signal libevent the THD should be killed.
403
Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
399
@details This function will signal libevent the Session should be killed.
400
Either the global LOCK_session_count or the Session's LOCK_delete must be locked
406
@param[in] thd The connection to kill
403
@param[in] session The connection to kill
409
static void libevent_post_kill_notification(THD *)
406
static void libevent_post_kill_notification(Session *)
412
Note, we just wake up libevent with an event that a THD should be killed,
413
It will search its list of thds for thd->killed == KILL_CONNECTION to
414
find the THDs it should kill.
409
Note, we just wake up libevent with an event that a Session should be killed,
410
It will search its list of sessions for session->killed == KILL_CONNECTION to
411
find the Sessions it should kill.
416
413
So we don't actually tell it which one and we don't actually use the
417
THD being passed to us, but that's just a design detail that could change
414
Session being passed to us, but that's just a design detail that could change
421
write(thd_kill_pipe[1], &c, sizeof(c));
418
assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
426
423
Close and delete a connection.
429
static void libevent_connection_close(THD *thd)
426
static void libevent_connection_close(Session *session)
431
thd->killed= THD::KILL_CONNECTION; // Avoid error messages
428
session->killed= Session::KILL_CONNECTION; // Avoid error messages
433
if (net_get_sd(&(thd->net)) >= 0) // not already closed
430
if (net_get_sd(&(session->net)) >= 0) // not already closed
436
close_connection(thd, 0, 1);
432
end_connection(session);
433
session->close_connection(0, 1);
438
thd->scheduler.thread_detach();
439
unlink_thd(thd); /* locks LOCK_thread_count and deletes thd */
435
session->scheduler.thread_detach();
436
unlink_session(session); /* locks LOCK_thread_count and deletes session */
440
437
pthread_mutex_unlock(&LOCK_thread_count);
447
Returns true if we should close and delete a THD connection.
444
Returns true if we should close and delete a Session connection.
450
static bool libevent_should_close_connection(THD* thd)
447
static bool libevent_should_close_connection(Session* session)
452
return net_should_close(&(thd->net)) ||
453
thd->killed == THD::KILL_CONNECTION;
449
return net_should_close(&(session->net)) ||
450
session->killed == Session::KILL_CONNECTION;
477
474
if (created_threads == thread_pool_size)
478
475
(void) pthread_cond_signal(&COND_thread_count);
479
476
(void) pthread_mutex_unlock(&LOCK_thread_count);
480
Session *session= NULL;
484
481
(void) pthread_mutex_lock(&LOCK_event_loop);
486
/* get thd(s) to process */
487
while (!thds_need_processing)
483
/* get session(s) to process */
484
while (!sessions_need_processing)
489
486
if (kill_pool_threads)
495
492
event_loop(EVLOOP_ONCE);
498
/* pop the first thd off the list */
499
thd= (THD*)thds_need_processing->data;
500
thds_need_processing= list_delete(thds_need_processing,
501
thds_need_processing);
495
/* pop the first session off the list */
496
session= (Session*)sessions_need_processing->data;
497
sessions_need_processing= list_delete(sessions_need_processing,
498
sessions_need_processing);
503
500
(void) pthread_mutex_unlock(&LOCK_event_loop);
505
/* now we process the connection (thd) */
507
/* set up the thd<->thread links. */
508
thd->thread_stack= (char*) &thd;
510
if (thd->scheduler.thread_attach())
502
/* now we process the connection (session) */
504
/* set up the session<->thread links. */
505
session->thread_stack= (char*) &session;
507
if (session->scheduler.thread_attach())
512
libevent_connection_close(thd);
509
libevent_connection_close(session);
516
513
/* is the connection logged in yet? */
517
if (!thd->scheduler.logged_in)
514
if (!session->scheduler.logged_in)
519
if (login_connection(thd))
516
if (login_connection(session))
521
518
/* Failed to log in */
522
libevent_connection_close(thd);
519
libevent_connection_close(session);
527
524
/* login successful */
528
thd->scheduler.logged_in= true;
529
prepare_new_connection_state(thd);
530
if (!libevent_needs_immediate_processing(thd))
525
session->scheduler.logged_in= true;
526
prepare_new_connection_state(session);
527
if (!libevent_needs_immediate_processing(session))
531
528
continue; /* New connection is now waiting for data in libevent*/
558
Returns true if the connection needs immediate processing and false if
555
Returns true if the connection needs immediate processing and false if
559
556
instead it's queued for libevent processing or closed,
562
static bool libevent_needs_immediate_processing(THD *thd)
559
static bool libevent_needs_immediate_processing(Session *session)
564
if (libevent_should_close_connection(thd))
561
if (libevent_should_close_connection(session))
566
libevent_connection_close(thd);
563
libevent_connection_close(session);
572
569
Note: we cannot add for event processing because the whole request might
573
570
already be buffered and we wouldn't receive an event.
575
if (net_more_data(&(thd->net)))
572
if (net_more_data(&(session->net)))
578
thd->scheduler.thread_detach();
579
libevent_thd_add(thd);
575
session->scheduler.thread_detach();
576
libevent_session_add(session);
585
Adds a THD to queued for libevent processing.
582
Adds a Session to queued for libevent processing.
587
584
This call does not actually register the event with libevent.
588
Instead, it places the THD onto a queue and signals libevent by writing
589
a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
590
be invoked which will find the THD on the queue and add it to libevent.
585
Instead, it places the Session onto a queue and signals libevent by writing
586
a byte into session_add_pipe, which will cause our libevent_add_session_callback to
587
be invoked which will find the Session on the queue and add it to libevent.
593
static void libevent_thd_add(THD* thd)
590
static void libevent_session_add(Session* session)
596
pthread_mutex_lock(&LOCK_thd_add);
593
pthread_mutex_lock(&LOCK_session_add);
597
594
/* queue for libevent */
598
thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
595
sessions_need_adding= list_add(sessions_need_adding, &session->scheduler.list);
599
596
/* notify libevent */
600
write(thd_add_pipe[1], &c, sizeof(c));
601
pthread_mutex_unlock(&LOCK_thd_add);
597
assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
598
pthread_mutex_unlock(&LOCK_session_add);
609
606
static void libevent_end()
611
608
(void) pthread_mutex_lock(&LOCK_thread_count);
613
610
kill_pool_threads= true;
614
611
while (killed_threads != created_threads)
616
613
/* wake up the event loop */
618
write(thd_add_pipe[1], &c, sizeof(c));
615
assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
620
617
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
622
619
(void) pthread_mutex_unlock(&LOCK_thread_count);
624
event_del(&thd_add_event);
625
close(thd_add_pipe[0]);
626
close(thd_add_pipe[1]);
627
event_del(&thd_kill_event);
628
close(thd_kill_pipe[0]);
629
close(thd_kill_pipe[1]);
621
event_del(&session_add_event);
622
close(session_add_pipe[0]);
623
close(session_add_pipe[1]);
624
event_del(&session_kill_event);
625
close(session_kill_pipe[0]);
626
close(session_kill_pipe[1]);
631
628
(void) pthread_mutex_destroy(&LOCK_event_loop);
632
(void) pthread_mutex_destroy(&LOCK_thd_add);
629
(void) pthread_mutex_destroy(&LOCK_session_add);