30
30
static bool init_dummy(void) {return 0;}
31
static void post_kill_dummy(Session *thd __attribute__((unused))) {}
31
static void post_kill_dummy(Session *session __attribute__((unused))) {}
32
32
static void end_dummy(void) {}
33
static bool end_thread_dummy(Session *thd __attribute__((unused)),
33
static bool end_thread_dummy(Session *session __attribute__((unused)),
34
34
bool cache_thread __attribute__((unused)))
50
50
static uint32_t created_threads, killed_threads;
51
51
static bool kill_pool_threads;
53
static struct event thd_add_event;
54
static struct event thd_kill_event;
56
static pthread_mutex_t LOCK_thd_add; /* protects thds_need_adding */
57
static LIST *thds_need_adding; /* list of thds to add to libevent queue */
59
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
60
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
53
static struct event session_add_event;
54
static struct event session_kill_event;
56
static pthread_mutex_t LOCK_session_add; /* protects sessions_need_adding */
57
static LIST *sessions_need_adding; /* list of sessions to add to libevent queue */
59
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
60
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
63
63
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
64
event_del) and thds_need_processing and thds_waiting_for_io.
64
event_del) and sessions_need_processing and sessions_waiting_for_io.
66
66
static pthread_mutex_t LOCK_event_loop;
67
static LIST *thds_need_processing; /* list of thds that needs some processing */
68
static LIST *thds_waiting_for_io; /* list of thds with added events */
67
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
68
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
70
70
pthread_handler_t libevent_thread_proc(void *arg);
71
71
static void libevent_end();
72
static bool libevent_needs_immediate_processing(Session *thd);
73
static void libevent_connection_close(Session *thd);
74
static bool libevent_should_close_connection(Session* thd);
75
static void libevent_thd_add(Session* thd);
72
static bool libevent_needs_immediate_processing(Session *session);
73
static void libevent_connection_close(Session *session);
74
static bool libevent_should_close_connection(Session* session);
75
static void libevent_session_add(Session* session);
76
76
void libevent_io_callback(int Fd, short Operation, void *ctx);
77
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
78
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
77
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
78
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
98
thd_scheduler keeps the link between Session and events.
98
session_scheduler keeps the link between Session and events.
99
99
It's embedded in the Session class.
102
thd_scheduler::thd_scheduler()
102
session_scheduler::session_scheduler()
103
103
: logged_in(false), io_event(NULL), thread_attached(false)
108
thd_scheduler::~thd_scheduler()
108
session_scheduler::~session_scheduler()
114
thd_scheduler::thd_scheduler(const thd_scheduler&)
114
session_scheduler::session_scheduler(const session_scheduler&)
115
115
: logged_in(false), io_event(NULL), thread_attached(false)
118
void thd_scheduler::operator=(const thd_scheduler&)
118
void session_scheduler::operator=(const session_scheduler&)
121
bool thd_scheduler::init(Session *parent_thd)
121
bool session_scheduler::init(Session *parent_session)
124
124
(struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
128
sql_print_error(_("Memory allocation error in thd_scheduler::init\n"));
128
sql_print_error(_("Memory allocation error in session_scheduler::init\n"));
132
event_set(io_event, net_get_sd(&(parent_thd->net)), EV_READ,
133
libevent_io_callback, (void*)parent_thd);
132
event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ,
133
libevent_io_callback, (void*)parent_session);
135
list.data= parent_thd;
135
list.data= parent_session;
142
142
Attach/associate the connection with the OS thread, for command processing.
145
bool thd_scheduler::thread_attach()
145
bool session_scheduler::thread_attach()
147
147
assert(!thread_attached);
148
Session* thd = (Session*)list.data;
149
if (libevent_should_close_connection(thd) ||
150
setup_connection_thread_globals(thd))
148
Session* session = (Session*)list.data;
149
if (libevent_should_close_connection(session) ||
150
setup_connection_thread_globals(session))
155
thd->mysys_var->abort= 0;
155
session->mysys_var->abort= 0;
156
156
thread_attached= true;
163
163
Detach/disassociate the connection with the OS thread.
166
void thd_scheduler::thread_detach()
166
void session_scheduler::thread_detach()
168
168
if (thread_attached)
170
Session* thd = (Session*)list.data;
171
thd->mysys_var= NULL;
170
Session* session = (Session*)list.data;
171
session->mysys_var= NULL;
172
172
thread_attached= false;
197
197
kill_pool_threads= false;
199
199
pthread_mutex_init(&LOCK_event_loop, NULL);
200
pthread_mutex_init(&LOCK_thd_add, NULL);
200
pthread_mutex_init(&LOCK_session_add, NULL);
202
/* set up the pipe used to add new thds to the event pool */
203
if (init_pipe(thd_add_pipe))
205
sql_print_error(_("init_pipe(thd_add_pipe) error in libevent_init\n"));
208
/* set up the pipe used to kill thds in the event queue */
209
if (init_pipe(thd_kill_pipe))
211
sql_print_error(_("init_pipe(thd_kill_pipe) error in libevent_init\n"));
212
close(thd_add_pipe[0]);
213
close(thd_add_pipe[1]);
216
event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
217
libevent_add_thd_callback, NULL);
218
event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
219
libevent_kill_thd_callback, NULL);
202
/* set up the pipe used to add new sessions to the event pool */
203
if (init_pipe(session_add_pipe))
205
sql_print_error(_("init_pipe(session_add_pipe) error in libevent_init\n"));
208
/* set up the pipe used to kill sessions in the event queue */
209
if (init_pipe(session_kill_pipe))
211
sql_print_error(_("init_pipe(session_kill_pipe) error in libevent_init\n"));
212
close(session_add_pipe[0]);
213
close(session_add_pipe[1]);
216
event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
217
libevent_add_session_callback, NULL);
218
event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
219
libevent_kill_session_callback, NULL);
221
if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
221
if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
223
sql_print_error(_("thd_add_event event_add error in libevent_init\n"));
223
sql_print_error(_("session_add_event event_add error in libevent_init\n"));
260
260
This is only called by the thread that owns LOCK_event_loop.
262
We add the thd that got the data to thds_need_processing, and
262
We add the session that got the data to sessions_need_processing, and
263
263
cause the libevent event_loop() to terminate. Then this same thread will
264
return from event_loop and pick the thd value back up for processing.
264
return from event_loop and pick the session value back up for processing.
267
267
void libevent_io_callback(int, short, void *ctx)
269
269
safe_mutex_assert_owner(&LOCK_event_loop);
270
Session *thd= (Session*)ctx;
271
thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
272
thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
270
Session *session= (Session*)ctx;
271
sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
272
sessions_need_processing= list_add(sessions_need_processing, &session->scheduler.list);
279
279
This is only called by the thread that owns LOCK_event_loop.
282
void libevent_kill_thd_callback(int Fd, short, void*)
282
void libevent_kill_session_callback(int Fd, short, void*)
284
284
safe_mutex_assert_owner(&LOCK_event_loop);
288
288
while (read(Fd, &c, sizeof(c)) == sizeof(c))
291
LIST* list= thds_waiting_for_io;
291
LIST* list= sessions_waiting_for_io;
294
Session *thd= (Session*)list->data;
294
Session *session= (Session*)list->data;
295
295
list= list_rest(list);
296
if (thd->killed == Session::KILL_CONNECTION)
296
if (session->killed == Session::KILL_CONNECTION)
299
299
Delete from libevent and add to the processing queue.
301
event_del(thd->scheduler.io_event);
302
thds_waiting_for_io= list_delete(thds_waiting_for_io,
303
&thd->scheduler.list);
304
thds_need_processing= list_add(thds_need_processing,
305
&thd->scheduler.list);
301
event_del(session->scheduler.io_event);
302
sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
303
&session->scheduler.list);
304
sessions_need_processing= list_add(sessions_need_processing,
305
&session->scheduler.list);
312
312
This is used to add connections to the pool. This callback is invoked from
313
the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
313
the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
317
317
This is only called by the thread that owns LOCK_event_loop.
320
void libevent_add_thd_callback(int Fd, short, void *)
320
void libevent_add_session_callback(int Fd, short, void *)
322
322
safe_mutex_assert_owner(&LOCK_event_loop);
326
326
while (read(Fd, &c, sizeof(c)) == sizeof(c))
329
pthread_mutex_lock(&LOCK_thd_add);
330
while (thds_need_adding)
329
pthread_mutex_lock(&LOCK_session_add);
330
while (sessions_need_adding)
332
/* pop the first thd off the list */
333
Session* thd= (Session*)thds_need_adding->data;
334
thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
332
/* pop the first session off the list */
333
Session* session= (Session*)sessions_need_adding->data;
334
sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
336
pthread_mutex_unlock(&LOCK_thd_add);
336
pthread_mutex_unlock(&LOCK_session_add);
338
if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
338
if (!session->scheduler.logged_in || libevent_should_close_connection(session))
341
Add thd to thds_need_processing list. If it needs closing we'll close
341
Add session to sessions_need_processing list. If it needs closing we'll close
342
342
it outside of event_loop().
344
thds_need_processing= list_add(thds_need_processing,
345
&thd->scheduler.list);
344
sessions_need_processing= list_add(sessions_need_processing,
345
&session->scheduler.list);
349
349
/* Add to libevent */
350
if (event_add(thd->scheduler.io_event, NULL))
350
if (event_add(session->scheduler.io_event, NULL))
352
sql_print_error(_("event_add error in libevent_add_thd_callback\n"));
353
libevent_connection_close(thd);
352
sql_print_error(_("event_add error in libevent_add_session_callback\n"));
353
libevent_connection_close(session);
357
thds_waiting_for_io= list_add(thds_waiting_for_io,
358
&thd->scheduler.list);
357
sessions_waiting_for_io= list_add(sessions_waiting_for_io,
358
&session->scheduler.list);
361
pthread_mutex_lock(&LOCK_thd_add);
361
pthread_mutex_lock(&LOCK_session_add);
363
pthread_mutex_unlock(&LOCK_thd_add);
363
pthread_mutex_unlock(&LOCK_session_add);
371
371
LOCK_thread_count is locked on entry. This function MUST unlock it!
374
static void libevent_add_connection(Session *thd)
374
static void libevent_add_connection(Session *session)
376
if (thd->scheduler.init(thd))
376
if (session->scheduler.init(session))
378
378
sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
379
379
pthread_mutex_unlock(&LOCK_thread_count);
380
libevent_connection_close(thd);
380
libevent_connection_close(session);
384
libevent_thd_add(thd);
383
threads.append(session);
384
libevent_session_add(session);
386
386
pthread_mutex_unlock(&LOCK_thread_count);
392
392
@brief Signal a waiting connection it's time to die.
394
394
@details This function will signal libevent the Session should be killed.
395
Either the global LOCK_thd_count or the Session's LOCK_delete must be locked
395
Either the global LOCK_session_count or the Session's LOCK_delete must be locked
398
@param[in] thd The connection to kill
398
@param[in] session The connection to kill
401
401
static void libevent_post_kill_notification(Session *)
404
404
Note, we just wake up libevent with an event that a Session should be killed,
405
It will search its list of thds for thd->killed == KILL_CONNECTION to
405
It will search its list of sessions for session->killed == KILL_CONNECTION to
406
406
find the Sessions it should kill.
408
408
So we don't actually tell it which one and we don't actually use the
418
418
Close and delete a connection.
421
static void libevent_connection_close(Session *thd)
421
static void libevent_connection_close(Session *session)
423
thd->killed= Session::KILL_CONNECTION; // Avoid error messages
423
session->killed= Session::KILL_CONNECTION; // Avoid error messages
425
if (net_get_sd(&(thd->net)) >= 0) // not already closed
425
if (net_get_sd(&(session->net)) >= 0) // not already closed
428
close_connection(thd, 0, 1);
427
end_connection(session);
428
close_connection(session, 0, 1);
430
thd->scheduler.thread_detach();
431
unlink_thd(thd); /* locks LOCK_thread_count and deletes thd */
430
session->scheduler.thread_detach();
431
unlink_session(session); /* locks LOCK_thread_count and deletes session */
432
432
pthread_mutex_unlock(&LOCK_thread_count);
439
439
Returns true if we should close and delete a Session connection.
442
static bool libevent_should_close_connection(Session* thd)
442
static bool libevent_should_close_connection(Session* session)
444
return net_should_close(&(thd->net)) ||
445
thd->killed == Session::KILL_CONNECTION;
444
return net_should_close(&(session->net)) ||
445
session->killed == Session::KILL_CONNECTION;
487
487
event_loop(EVLOOP_ONCE);
490
/* pop the first thd off the list */
491
thd= (Session*)thds_need_processing->data;
492
thds_need_processing= list_delete(thds_need_processing,
493
thds_need_processing);
490
/* pop the first session off the list */
491
session= (Session*)sessions_need_processing->data;
492
sessions_need_processing= list_delete(sessions_need_processing,
493
sessions_need_processing);
495
495
(void) pthread_mutex_unlock(&LOCK_event_loop);
497
/* now we process the connection (thd) */
499
/* set up the thd<->thread links. */
500
thd->thread_stack= (char*) &thd;
502
if (thd->scheduler.thread_attach())
497
/* now we process the connection (session) */
499
/* set up the session<->thread links. */
500
session->thread_stack= (char*) &session;
502
if (session->scheduler.thread_attach())
504
libevent_connection_close(thd);
504
libevent_connection_close(session);
508
508
/* is the connection logged in yet? */
509
if (!thd->scheduler.logged_in)
509
if (!session->scheduler.logged_in)
511
if (login_connection(thd))
511
if (login_connection(session))
513
513
/* Failed to log in */
514
libevent_connection_close(thd);
514
libevent_connection_close(session);
519
519
/* login successful */
520
thd->scheduler.logged_in= true;
521
prepare_new_connection_state(thd);
522
if (!libevent_needs_immediate_processing(thd))
520
session->scheduler.logged_in= true;
521
prepare_new_connection_state(session);
522
if (!libevent_needs_immediate_processing(session))
523
523
continue; /* New connection is now waiting for data in libevent*/
551
551
instead it's queued for libevent processing or closed,
554
static bool libevent_needs_immediate_processing(Session *thd)
554
static bool libevent_needs_immediate_processing(Session *session)
556
if (libevent_should_close_connection(thd))
556
if (libevent_should_close_connection(session))
558
libevent_connection_close(thd);
558
libevent_connection_close(session);
564
564
Note: we cannot add for event processing because the whole request might
565
565
already be buffered and we wouldn't receive an event.
567
if (net_more_data(&(thd->net)))
567
if (net_more_data(&(session->net)))
570
thd->scheduler.thread_detach();
571
libevent_thd_add(thd);
570
session->scheduler.thread_detach();
571
libevent_session_add(session);
579
579
This call does not actually register the event with libevent.
580
580
Instead, it places the Session onto a queue and signals libevent by writing
581
a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
581
a byte into session_add_pipe, which will cause our libevent_add_session_callback to
582
582
be invoked which will find the Session on the queue and add it to libevent.
585
static void libevent_thd_add(Session* thd)
585
static void libevent_session_add(Session* session)
588
pthread_mutex_lock(&LOCK_thd_add);
588
pthread_mutex_lock(&LOCK_session_add);
589
589
/* queue for libevent */
590
thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
590
sessions_need_adding= list_add(sessions_need_adding, &session->scheduler.list);
591
591
/* notify libevent */
592
assert(write(thd_add_pipe[1], &c, sizeof(c))==sizeof(c));
593
pthread_mutex_unlock(&LOCK_thd_add);
592
assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
593
pthread_mutex_unlock(&LOCK_session_add);
608
608
/* wake up the event loop */
610
assert(write(thd_add_pipe[1], &c, sizeof(c))==sizeof(c));
610
assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
612
612
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
614
614
(void) pthread_mutex_unlock(&LOCK_thread_count);
616
event_del(&thd_add_event);
617
close(thd_add_pipe[0]);
618
close(thd_add_pipe[1]);
619
event_del(&thd_kill_event);
620
close(thd_kill_pipe[0]);
621
close(thd_kill_pipe[1]);
616
event_del(&session_add_event);
617
close(session_add_pipe[0]);
618
close(session_add_pipe[1]);
619
event_del(&session_kill_event);
620
close(session_kill_pipe[0]);
621
close(session_kill_pipe[1]);
623
623
(void) pthread_mutex_destroy(&LOCK_event_loop);
624
(void) pthread_mutex_destroy(&LOCK_thd_add);
624
(void) pthread_mutex_destroy(&LOCK_session_add);