20
20
#include <drizzled/server_includes.h>
21
#include <libdrizzle/libdrizzle.h>
23
#include <drizzled/gettext.h>
24
#include <drizzled/sql_parse.h>
25
#include <drizzled/scheduler.h>
26
#include <drizzled/session.h>
27
/* API for connecting, logging in to a drizzled server */
28
#include <drizzled/connect.h>
25
31
'Dummy' functions to be used when we don't need any handling for a scheduler
29
35
static bool init_dummy(void) {return 0;}
30
static void post_kill_dummy(THD *thd __attribute__((unused))) {}
36
static void post_kill_dummy(Session *session __attribute__((unused))) {}
31
37
static void end_dummy(void) {}
32
static bool end_thread_dummy(THD *thd __attribute__((unused)),
38
static bool end_thread_dummy(Session *session __attribute__((unused)),
33
39
bool cache_thread __attribute__((unused)))
46
52
end_thread(end_thread_dummy), end(end_dummy)
49
static uint created_threads, killed_threads;
55
static uint32_t created_threads, killed_threads;
50
56
static bool kill_pool_threads;
52
static struct event thd_add_event;
53
static struct event thd_kill_event;
55
static pthread_mutex_t LOCK_thd_add; /* protects thds_need_adding */
56
static LIST *thds_need_adding; /* list of thds to add to libevent queue */
58
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
59
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
58
static struct event session_add_event;
59
static struct event session_kill_event;
61
static pthread_mutex_t LOCK_session_add; /* protects sessions_need_adding */
62
static LIST *sessions_need_adding; /* list of sessions to add to libevent queue */
64
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
65
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
62
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
63
event_del) and thds_need_processing and thds_waiting_for_io.
68
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
69
event_del) and sessions_need_processing and sessions_waiting_for_io.
65
71
static pthread_mutex_t LOCK_event_loop;
66
static LIST *thds_need_processing; /* list of thds that needs some processing */
67
static LIST *thds_waiting_for_io; /* list of thds with added events */
72
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
73
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
69
75
pthread_handler_t libevent_thread_proc(void *arg);
70
76
static void libevent_end();
71
static bool libevent_needs_immediate_processing(THD *thd);
72
static void libevent_connection_close(THD *thd);
73
static bool libevent_should_close_connection(THD* thd);
74
static void libevent_thd_add(THD* thd);
77
static bool libevent_needs_immediate_processing(Session *session);
78
static void libevent_connection_close(Session *session);
79
static bool libevent_should_close_connection(Session* session);
80
static void libevent_session_add(Session* session);
75
81
void libevent_io_callback(int Fd, short Operation, void *ctx);
76
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
77
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
82
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
83
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
97
thd_scheduler keeps the link between THD and events.
98
It's embedded in the THD class.
103
session_scheduler keeps the link between Session and events.
104
It's embedded in the Session class.
101
thd_scheduler::thd_scheduler()
102
: logged_in(false), io_event(NULL), thread_attached(false)
104
dbug_explain_buf[0]= 0;
108
thd_scheduler::~thd_scheduler()
110
my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
114
bool thd_scheduler::init(THD *parent_thd)
117
(struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
107
session_scheduler::session_scheduler()
108
: logged_in(false), io_event(NULL), thread_attached(false)
113
session_scheduler::~session_scheduler()
119
session_scheduler::session_scheduler(const session_scheduler&)
120
: logged_in(false), io_event(NULL), thread_attached(false)
123
void session_scheduler::operator=(const session_scheduler&)
126
bool session_scheduler::init(Session *parent_session)
128
io_event= new struct event;
130
if (io_event == NULL)
121
sql_print_error("Memory allocation error in thd_scheduler::init\n");
132
errmsg_printf(ERRMSG_LVL_ERROR, _("Memory allocation error in session_scheduler::init\n"));
125
event_set(io_event, parent_thd->net.vio->sd, EV_READ,
126
libevent_io_callback, (void*)parent_thd);
128
list.data= parent_thd;
135
memset(io_event, 0, sizeof(*io_event));
137
event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ,
138
libevent_io_callback, (void*)parent_session);
140
list.data= parent_session;
135
147
Attach/associate the connection with the OS thread, for command processing.
138
bool thd_scheduler::thread_attach()
150
bool session_scheduler::thread_attach()
140
152
assert(!thread_attached);
141
THD* thd = (THD*)list.data;
142
if (libevent_should_close_connection(thd) ||
143
setup_connection_thread_globals(thd))
153
Session* session = (Session*)list.data;
154
if (libevent_should_close_connection(session) ||
155
setup_connection_thread_globals(session))
148
thd->mysys_var->abort= 0;
160
session->mysys_var->abort= 0;
149
161
thread_attached= true;
156
168
Detach/disassociate the connection with the OS thread.
159
void thd_scheduler::thread_detach()
171
void session_scheduler::thread_detach()
161
173
if (thread_attached)
163
THD* thd = (THD*)list.data;
164
thd->mysys_var= NULL;
175
Session* session = (Session*)list.data;
176
session->mysys_var= NULL;
165
177
thread_attached= false;
172
Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
174
This is used to preserve the SESSION DEBUG variable, which is mapped to the OS
175
thread during a command, but each command is handled by a different thread.
177
void thd_scheduler::swap_dbug_explain()
179
char buffer[sizeof(dbug_explain_buf)];
180
memcpy(dbug_explain_buf, buffer, sizeof(buffer));
184
182
Create all threads for the thread pool
196
194
static bool libevent_init(void)
202
200
created_threads= 0;
203
201
killed_threads= 0;
204
202
kill_pool_threads= false;
206
204
pthread_mutex_init(&LOCK_event_loop, NULL);
207
pthread_mutex_init(&LOCK_thd_add, NULL);
209
/* set up the pipe used to add new thds to the event pool */
210
if (init_pipe(thd_add_pipe))
212
sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
215
/* set up the pipe used to kill thds in the event queue */
216
if (init_pipe(thd_kill_pipe))
218
sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
219
close(thd_add_pipe[0]);
220
close(thd_add_pipe[1]);
223
event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
224
libevent_add_thd_callback, NULL);
225
event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
226
libevent_kill_thd_callback, NULL);
228
if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
205
pthread_mutex_init(&LOCK_session_add, NULL);
207
/* set up the pipe used to add new sessions to the event pool */
208
if (init_pipe(session_add_pipe))
210
errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_add_pipe) error in libevent_init\n"));
213
/* set up the pipe used to kill sessions in the event queue */
214
if (init_pipe(session_kill_pipe))
216
errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_kill_pipe) error in libevent_init\n"));
217
close(session_add_pipe[0]);
218
close(session_add_pipe[1]);
221
event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
222
libevent_add_session_callback, NULL);
223
event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
224
libevent_kill_session_callback, NULL);
226
if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
230
sql_print_error("thd_add_event event_add error in libevent_init\n");
228
errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
235
233
/* Set up the thread pool */
236
234
created_threads= killed_threads= 0;
243
241
if ((error= pthread_create(&thread, &connection_attrib,
244
242
libevent_thread_proc, 0)))
246
sql_print_error("Can't create completion port thread (error %d)",
244
errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
248
246
pthread_mutex_unlock(&LOCK_thread_count);
249
247
libevent_end(); // Cleanup
255
253
while (created_threads != thread_pool_size)
256
254
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
257
255
pthread_mutex_unlock(&LOCK_thread_count);
264
262
This is called when data is ready on the socket.
267
265
This is only called by the thread that owns LOCK_event_loop.
269
We add the thd that got the data to thds_need_processing, and
267
We add the session that got the data to sessions_need_processing, and
270
268
cause the libevent event_loop() to terminate. Then this same thread will
271
return from event_loop and pick the thd value back up for processing.
269
return from event_loop and pick the session value back up for processing.
274
272
void libevent_io_callback(int, short, void *ctx)
276
274
safe_mutex_assert_owner(&LOCK_event_loop);
278
thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
279
thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
275
Session *session= (Session*)ctx;
276
sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
277
sessions_need_processing= list_add(sessions_need_processing, &session->scheduler.list);
283
281
This is called when we have a thread we want to be killed.
286
284
This is only called by the thread that owns LOCK_event_loop.
289
void libevent_kill_thd_callback(int Fd, short, void*)
287
void libevent_kill_session_callback(int Fd, short, void*)
291
289
safe_mutex_assert_owner(&LOCK_event_loop);
293
291
/* clear the pending events */
295
293
while (read(Fd, &c, sizeof(c)) == sizeof(c))
298
LIST* list= thds_waiting_for_io;
296
LIST* list= sessions_waiting_for_io;
301
THD *thd= (THD*)list->data;
299
Session *session= (Session*)list->data;
302
300
list= list_rest(list);
303
if (thd->killed == THD::KILL_CONNECTION)
301
if (session->killed == Session::KILL_CONNECTION)
306
304
Delete from libevent and add to the processing queue.
308
event_del(thd->scheduler.io_event);
309
thds_waiting_for_io= list_delete(thds_waiting_for_io,
310
&thd->scheduler.list);
311
thds_need_processing= list_add(thds_need_processing,
312
&thd->scheduler.list);
306
event_del(session->scheduler.io_event);
307
sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
308
&session->scheduler.list);
309
sessions_need_processing= list_add(sessions_need_processing,
310
&session->scheduler.list);
319
317
This is used to add connections to the pool. This callback is invoked from
320
the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
318
the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
324
322
This is only called by the thread that owns LOCK_event_loop.
327
void libevent_add_thd_callback(int Fd, short, void *)
325
void libevent_add_session_callback(int Fd, short, void *)
329
327
safe_mutex_assert_owner(&LOCK_event_loop);
331
329
/* clear the pending events */
333
331
while (read(Fd, &c, sizeof(c)) == sizeof(c))
336
pthread_mutex_lock(&LOCK_thd_add);
337
while (thds_need_adding)
334
pthread_mutex_lock(&LOCK_session_add);
335
while (sessions_need_adding)
339
/* pop the first thd off the list */
340
THD* thd= (THD*)thds_need_adding->data;
341
thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
343
pthread_mutex_unlock(&LOCK_thd_add);
345
if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
337
/* pop the first session off the list */
338
Session* session= (Session*)sessions_need_adding->data;
339
sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
341
pthread_mutex_unlock(&LOCK_session_add);
343
if (!session->scheduler.logged_in || libevent_should_close_connection(session))
348
Add thd to thds_need_processing list. If it needs closing we'll close
346
Add session to sessions_need_processing list. If it needs closing we'll close
349
347
it outside of event_loop().
351
thds_need_processing= list_add(thds_need_processing,
352
&thd->scheduler.list);
349
sessions_need_processing= list_add(sessions_need_processing,
350
&session->scheduler.list);
356
354
/* Add to libevent */
357
if (event_add(thd->scheduler.io_event, NULL))
355
if (event_add(session->scheduler.io_event, NULL))
359
sql_print_error("event_add error in libevent_add_thd_callback\n");
360
libevent_connection_close(thd);
357
errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
358
libevent_connection_close(session);
364
thds_waiting_for_io= list_add(thds_waiting_for_io,
365
&thd->scheduler.list);
362
sessions_waiting_for_io= list_add(sessions_waiting_for_io,
363
&session->scheduler.list);
368
pthread_mutex_lock(&LOCK_thd_add);
366
pthread_mutex_lock(&LOCK_session_add);
370
pthread_mutex_unlock(&LOCK_thd_add);
368
pthread_mutex_unlock(&LOCK_session_add);
378
376
LOCK_thread_count is locked on entry. This function MUST unlock it!
381
static void libevent_add_connection(THD *thd)
379
static void libevent_add_connection(Session *session)
383
if (thd->scheduler.init(thd))
381
if (session->scheduler.init(session))
385
sql_print_error("Scheduler init error in libevent_add_new_connection\n");
383
errmsg_printf(ERRMSG_LVL_ERROR, _("Scheduler init error in libevent_add_new_connection\n"));
386
384
pthread_mutex_unlock(&LOCK_thread_count);
387
libevent_connection_close(thd);
385
libevent_connection_close(session);
391
libevent_thd_add(thd);
388
threads.append(session);
389
libevent_session_add(session);
393
391
pthread_mutex_unlock(&LOCK_thread_count);
399
397
@brief Signal a waiting connection it's time to die.
401
@details This function will signal libevent the THD should be killed.
402
Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
399
@details This function will signal libevent the Session should be killed.
400
Either the global LOCK_session_count or the Session's LOCK_delete must be locked
405
@param[in] thd The connection to kill
403
@param[in] session The connection to kill
408
static void libevent_post_kill_notification(THD *)
406
static void libevent_post_kill_notification(Session *)
411
Note, we just wake up libevent with an event that a THD should be killed,
412
It will search its list of thds for thd->killed == KILL_CONNECTION to
413
find the THDs it should kill.
409
Note, we just wake up libevent with an event that a Session should be killed,
410
It will search its list of sessions for session->killed == KILL_CONNECTION to
411
find the Sessions it should kill.
415
413
So we don't actually tell it which one and we don't actually use the
416
THD being passed to us, but that's just a design detail that could change
414
Session being passed to us, but that's just a design detail that could change
420
write(thd_kill_pipe[1], &c, sizeof(c));
418
assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
425
423
Close and delete a connection.
428
static void libevent_connection_close(THD *thd)
426
static void libevent_connection_close(Session *session)
430
thd->killed= THD::KILL_CONNECTION; // Avoid error messages
428
session->killed= Session::KILL_CONNECTION; // Avoid error messages
432
if (thd->net.vio->sd >= 0) // not already closed
430
if (net_get_sd(&(session->net)) >= 0) // not already closed
435
close_connection(thd, 0, 1);
432
end_connection(session);
433
session->close_connection(0, 1);
437
thd->scheduler.thread_detach();
438
unlink_thd(thd); /* locks LOCK_thread_count and deletes thd */
435
session->scheduler.thread_detach();
436
unlink_session(session); /* locks LOCK_thread_count and deletes session */
439
437
pthread_mutex_unlock(&LOCK_thread_count);
446
Returns true if we should close and delete a THD connection.
444
Returns true if we should close and delete a Session connection.
449
static bool libevent_should_close_connection(THD* thd)
447
static bool libevent_should_close_connection(Session* session)
451
return thd->net.error ||
453
thd->killed == THD::KILL_CONNECTION;
449
return net_should_close(&(session->net)) ||
450
session->killed == Session::KILL_CONNECTION;
477
474
if (created_threads == thread_pool_size)
478
475
(void) pthread_cond_signal(&COND_thread_count);
479
476
(void) pthread_mutex_unlock(&LOCK_thread_count);
480
Session *session= NULL;
484
481
(void) pthread_mutex_lock(&LOCK_event_loop);
486
/* get thd(s) to process */
487
while (!thds_need_processing)
483
/* get session(s) to process */
484
while (!sessions_need_processing)
489
486
if (kill_pool_threads)
495
492
event_loop(EVLOOP_ONCE);
498
/* pop the first thd off the list */
499
thd= (THD*)thds_need_processing->data;
500
thds_need_processing= list_delete(thds_need_processing,
501
thds_need_processing);
495
/* pop the first session off the list */
496
session= (Session*)sessions_need_processing->data;
497
sessions_need_processing= list_delete(sessions_need_processing,
498
sessions_need_processing);
503
500
(void) pthread_mutex_unlock(&LOCK_event_loop);
505
/* now we process the connection (thd) */
507
/* set up the thd<->thread links. */
508
thd->thread_stack= (char*) &thd;
510
if (thd->scheduler.thread_attach())
502
/* now we process the connection (session) */
504
/* set up the session<->thread links. */
505
session->thread_stack= (char*) &session;
507
if (session->scheduler.thread_attach())
512
libevent_connection_close(thd);
509
libevent_connection_close(session);
516
513
/* is the connection logged in yet? */
517
if (!thd->scheduler.logged_in)
514
if (!session->scheduler.logged_in)
519
if (login_connection(thd))
516
if (login_connection(session))
521
518
/* Failed to log in */
522
libevent_connection_close(thd);
519
libevent_connection_close(session);
527
524
/* login successful */
528
thd->scheduler.logged_in= true;
529
prepare_new_connection_state(thd);
530
if (!libevent_needs_immediate_processing(thd))
525
session->scheduler.logged_in= true;
526
prepare_new_connection_state(session);
527
if (!libevent_needs_immediate_processing(session))
531
528
continue; /* New connection is now waiting for data in libevent*/
558
Returns true if the connection needs immediate processing and false if
555
Returns true if the connection needs immediate processing and false if
559
556
instead it's queued for libevent processing or closed,
562
static bool libevent_needs_immediate_processing(THD *thd)
559
static bool libevent_needs_immediate_processing(Session *session)
564
if (libevent_should_close_connection(thd))
561
if (libevent_should_close_connection(session))
566
libevent_connection_close(thd);
563
libevent_connection_close(session);
572
569
Note: we cannot add for event processing because the whole request might
573
570
already be buffered and we wouldn't receive an event.
575
if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
572
if (net_more_data(&(session->net)))
578
thd->scheduler.thread_detach();
579
libevent_thd_add(thd);
575
session->scheduler.thread_detach();
576
libevent_session_add(session);
585
Adds a THD to queued for libevent processing.
582
Adds a Session to queued for libevent processing.
587
584
This call does not actually register the event with libevent.
588
Instead, it places the THD onto a queue and signals libevent by writing
589
a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
590
be invoked which will find the THD on the queue and add it to libevent.
585
Instead, it places the Session onto a queue and signals libevent by writing
586
a byte into session_add_pipe, which will cause our libevent_add_session_callback to
587
be invoked which will find the Session on the queue and add it to libevent.
593
static void libevent_thd_add(THD* thd)
590
static void libevent_session_add(Session* session)
596
pthread_mutex_lock(&LOCK_thd_add);
593
pthread_mutex_lock(&LOCK_session_add);
597
594
/* queue for libevent */
598
thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
595
sessions_need_adding= list_add(sessions_need_adding, &session->scheduler.list);
599
596
/* notify libevent */
600
write(thd_add_pipe[1], &c, sizeof(c));
601
pthread_mutex_unlock(&LOCK_thd_add);
597
assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
598
pthread_mutex_unlock(&LOCK_session_add);
609
606
static void libevent_end()
611
608
(void) pthread_mutex_lock(&LOCK_thread_count);
613
610
kill_pool_threads= true;
614
611
while (killed_threads != created_threads)
616
613
/* wake up the event loop */
618
write(thd_add_pipe[1], &c, sizeof(c));
615
assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
620
617
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
622
619
(void) pthread_mutex_unlock(&LOCK_thread_count);
624
event_del(&thd_add_event);
625
close(thd_add_pipe[0]);
626
close(thd_add_pipe[1]);
627
event_del(&thd_kill_event);
628
close(thd_kill_pipe[0]);
629
close(thd_kill_pipe[1]);
621
event_del(&session_add_event);
622
close(session_add_pipe[0]);
623
close(session_add_pipe[1]);
624
event_del(&session_kill_event);
625
close(session_kill_pipe[0]);
626
close(session_kill_pipe[1]);
631
628
(void) pthread_mutex_destroy(&LOCK_event_loop);
632
(void) pthread_mutex_destroy(&LOCK_thd_add);
629
(void) pthread_mutex_destroy(&LOCK_session_add);