52
53
end_thread(end_thread_dummy), end(end_dummy)
55
static uint32_t created_threads, killed_threads;
56
static bool kill_pool_threads;
58
static struct event session_add_event;
59
static struct event session_kill_event;
61
static pthread_mutex_t LOCK_session_add; /* protects sessions_need_adding */
62
static LIST *sessions_need_adding= NULL; /* list of sessions to add to libevent queue */
64
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
65
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
68
LOCK_event_loop protects the non-thread safe libevent calls (event_add and
69
event_del) and sessions_need_processing and sessions_waiting_for_io.
71
static pthread_mutex_t LOCK_event_loop;
72
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
73
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
75
pthread_handler_t libevent_thread_proc(void *arg);
76
static void libevent_end();
77
static bool libevent_needs_immediate_processing(Session *session);
78
static void libevent_connection_close(Session *session);
79
static bool libevent_should_close_connection(Session* session);
80
static void libevent_session_add(Session* session);
81
void libevent_io_callback(int Fd, short Operation, void *ctx);
82
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
83
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
87
Create a pipe and set to non-blocking.
88
Returns true if there is an error.
91
static bool init_pipe(int pipe_fds[])
94
return pipe(pipe_fds) < 0 ||
95
(flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
96
fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
97
(flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
98
fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
103
session_scheduler keeps the link between Session and events.
104
It's embedded in the Session class.
107
session_scheduler::session_scheduler()
108
: logged_in(false), io_event(NULL), thread_attached(false)
113
session_scheduler::~session_scheduler()
119
session_scheduler::session_scheduler(const session_scheduler&)
120
: logged_in(false), io_event(NULL), thread_attached(false)
123
void session_scheduler::operator=(const session_scheduler&)
126
bool session_scheduler::init(Session *parent_session)
128
io_event= new struct event;
130
if (io_event == NULL)
132
errmsg_printf(ERRMSG_LVL_ERROR, _("Memory allocation error in session_scheduler::init\n"));
135
memset(io_event, 0, sizeof(*io_event));
137
event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ,
138
libevent_io_callback, (void*)parent_session);
140
list.data= parent_session;
147
Attach/associate the connection with the OS thread, for command processing.
150
bool session_scheduler::thread_attach()
152
assert(!thread_attached);
153
Session* session = (Session*)list.data;
154
if (libevent_should_close_connection(session) ||
155
setup_connection_thread_globals(session))
160
session->mysys_var->abort= 0;
161
thread_attached= true;
168
Detach/disassociate the connection with the OS thread.
171
void session_scheduler::thread_detach()
175
Session* session = (Session*)list.data;
176
session->mysys_var= NULL;
177
thread_attached= false;
182
Create all threads for the thread pool
185
After threads are created we wait until all threads has signaled that
186
they have started before we return
190
1 We got an error creating the thread pool
191
In this case we will abort all created threads
194
static bool libevent_init(void)
202
kill_pool_threads= false;
204
pthread_mutex_init(&LOCK_event_loop, NULL);
205
pthread_mutex_init(&LOCK_session_add, NULL);
207
/* set up the pipe used to add new sessions to the event pool */
208
if (init_pipe(session_add_pipe))
210
errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_add_pipe) error in libevent_init\n"));
213
/* set up the pipe used to kill sessions in the event queue */
214
if (init_pipe(session_kill_pipe))
216
errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_kill_pipe) error in libevent_init\n"));
217
close(session_add_pipe[0]);
218
close(session_add_pipe[1]);
221
event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
222
libevent_add_session_callback, NULL);
223
event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
224
libevent_kill_session_callback, NULL);
226
if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
228
errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
233
/* Set up the thread pool */
234
created_threads= killed_threads= 0;
235
pthread_mutex_lock(&LOCK_thread_count);
237
for (i= 0; i < thread_pool_size; i++)
241
if ((error= pthread_create(&thread, &connection_attrib,
242
libevent_thread_proc, 0)))
244
errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
246
pthread_mutex_unlock(&LOCK_thread_count);
247
libevent_end(); // Cleanup
252
/* Wait until all threads are created */
253
while (created_threads != thread_pool_size)
254
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
255
pthread_mutex_unlock(&LOCK_thread_count);
262
This is called when data is ready on the socket.
265
This is only called by the thread that owns LOCK_event_loop.
267
We add the session that got the data to sessions_need_processing, and
268
cause the libevent event_loop() to terminate. Then this same thread will
269
return from event_loop and pick the session value back up for processing.
272
void libevent_io_callback(int, short, void *ctx)
274
safe_mutex_assert_owner(&LOCK_event_loop);
275
Session *session= (Session*)ctx;
276
sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
277
sessions_need_processing= list_add(sessions_need_processing, &session->scheduler.list);
281
This is called when we have a thread we want to be killed.
284
This is only called by the thread that owns LOCK_event_loop.
287
void libevent_kill_session_callback(int Fd, short, void*)
289
safe_mutex_assert_owner(&LOCK_event_loop);
291
/* clear the pending events */
293
while (read(Fd, &c, sizeof(c)) == sizeof(c))
296
LIST* list= sessions_waiting_for_io;
299
Session *session= (Session*)list->data;
300
list= list_rest(list);
301
if (session->killed == Session::KILL_CONNECTION)
304
Delete from libevent and add to the processing queue.
306
event_del(session->scheduler.io_event);
307
sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
308
&session->scheduler.list);
309
sessions_need_processing= list_add(sessions_need_processing,
310
&session->scheduler.list);
317
This is used to add connections to the pool. This callback is invoked from
318
the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
322
This is only called by the thread that owns LOCK_event_loop.
325
void libevent_add_session_callback(int Fd, short, void *)
327
safe_mutex_assert_owner(&LOCK_event_loop);
329
/* clear the pending events */
331
while (read(Fd, &c, sizeof(c)) == sizeof(c))
334
pthread_mutex_lock(&LOCK_session_add);
335
while (sessions_need_adding)
337
/* pop the first session off the list */
338
Session* session= (Session*)sessions_need_adding->data;
339
sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
341
pthread_mutex_unlock(&LOCK_session_add);
343
if (!session->scheduler.logged_in || libevent_should_close_connection(session))
346
Add session to sessions_need_processing list. If it needs closing we'll close
347
it outside of event_loop().
349
sessions_need_processing= list_add(sessions_need_processing,
350
&session->scheduler.list);
354
/* Add to libevent */
355
if (event_add(session->scheduler.io_event, NULL))
357
errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
358
libevent_connection_close(session);
362
sessions_waiting_for_io= list_add(sessions_waiting_for_io,
363
&session->scheduler.list);
366
pthread_mutex_lock(&LOCK_session_add);
368
pthread_mutex_unlock(&LOCK_session_add);
373
Notify the thread pool about a new connection
376
LOCK_thread_count is locked on entry. This function MUST unlock it!
379
static void libevent_add_connection(Session *session)
381
if (session->scheduler.init(session))
383
errmsg_printf(ERRMSG_LVL_ERROR, _("Scheduler init error in libevent_add_new_connection\n"));
384
pthread_mutex_unlock(&LOCK_thread_count);
385
libevent_connection_close(session);
388
threads.append(session);
389
libevent_session_add(session);
391
pthread_mutex_unlock(&LOCK_thread_count);
397
@brief Signal a waiting connection it's time to die.
399
@details This function will signal libevent the Session should be killed.
400
Either the global LOCK_session_count or the Session's LOCK_delete must be locked
403
@param[in] session The connection to kill
406
static void libevent_post_kill_notification(Session *)
409
Note, we just wake up libevent with an event that a Session should be killed,
410
It will search its list of sessions for session->killed == KILL_CONNECTION to
411
find the Sessions it should kill.
413
So we don't actually tell it which one and we don't actually use the
414
Session being passed to us, but that's just a design detail that could change
418
assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
423
Close and delete a connection.
426
static void libevent_connection_close(Session *session)
428
session->killed= Session::KILL_CONNECTION; // Avoid error messages
430
if (net_get_sd(&(session->net)) >= 0) // not already closed
432
end_connection(session);
433
session->close_connection(0, 1);
435
session->scheduler.thread_detach();
436
unlink_session(session); /* locks LOCK_thread_count and deletes session */
437
pthread_mutex_unlock(&LOCK_thread_count);
444
Returns true if we should close and delete a Session connection.
447
static bool libevent_should_close_connection(Session* session)
449
return net_should_close(&(session->net)) ||
450
session->killed == Session::KILL_CONNECTION;
455
libevent_thread_proc is the outer loop of each thread in the thread pool.
456
These procs only return/terminate on shutdown (kill_pool_threads == true).
459
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
461
if (init_new_connection_handler_thread())
463
my_thread_global_end();
464
errmsg_printf(ERRMSG_LVL_ERROR, _("libevent_thread_proc: my_thread_init() failed\n"));
469
Signal libevent_init() when all threads has been created and are ready to
472
(void) pthread_mutex_lock(&LOCK_thread_count);
474
if (created_threads == thread_pool_size)
475
(void) pthread_cond_signal(&COND_thread_count);
476
(void) pthread_mutex_unlock(&LOCK_thread_count);
480
Session *session= NULL;
481
(void) pthread_mutex_lock(&LOCK_event_loop);
483
/* get session(s) to process */
484
while (!sessions_need_processing)
486
if (kill_pool_threads)
488
/* the flag that we should die has been set */
489
(void) pthread_mutex_unlock(&LOCK_event_loop);
492
event_loop(EVLOOP_ONCE);
495
/* pop the first session off the list */
496
session= (Session*)sessions_need_processing->data;
497
sessions_need_processing= list_delete(sessions_need_processing,
498
sessions_need_processing);
500
(void) pthread_mutex_unlock(&LOCK_event_loop);
502
/* now we process the connection (session) */
504
/* set up the session<->thread links. */
505
session->thread_stack= (char*) &session;
507
if (session->scheduler.thread_attach())
509
libevent_connection_close(session);
513
/* is the connection logged in yet? */
514
if (!session->scheduler.logged_in)
516
if (login_connection(session))
518
/* Failed to log in */
519
libevent_connection_close(session);
524
/* login successful */
525
session->scheduler.logged_in= true;
526
prepare_new_connection_state(session);
527
if (!libevent_needs_immediate_processing(session))
528
continue; /* New connection is now waiting for data in libevent*/
534
/* Process a query */
535
if (do_command(session))
537
libevent_connection_close(session);
540
} while (libevent_needs_immediate_processing(session));
544
(void) pthread_mutex_lock(&LOCK_thread_count);
546
pthread_cond_broadcast(&COND_thread_count);
547
(void) pthread_mutex_unlock(&LOCK_thread_count);
550
return(0); /* purify: deadcode */
555
Returns true if the connection needs immediate processing and false if
556
instead it's queued for libevent processing or closed,
559
static bool libevent_needs_immediate_processing(Session *session)
561
if (libevent_should_close_connection(session))
563
libevent_connection_close(session);
567
If more data in the socket buffer, return true to process another command.
569
Note: we cannot add for event processing because the whole request might
570
already be buffered and we wouldn't receive an event.
572
if (net_more_data(&(session->net)))
575
session->scheduler.thread_detach();
576
libevent_session_add(session);
582
Adds a Session to queued for libevent processing.
584
This call does not actually register the event with libevent.
585
Instead, it places the Session onto a queue and signals libevent by writing
586
a byte into session_add_pipe, which will cause our libevent_add_session_callback to
587
be invoked which will find the Session on the queue and add it to libevent.
590
static void libevent_session_add(Session* session)
593
pthread_mutex_lock(&LOCK_session_add);
594
/* queue for libevent */
595
sessions_need_adding= list_add(sessions_need_adding, &session->scheduler.list);
596
/* notify libevent */
597
assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
598
pthread_mutex_unlock(&LOCK_session_add);
603
Wait until all pool threads have been deleted for clean shutdown
606
static void libevent_end()
608
(void) pthread_mutex_lock(&LOCK_thread_count);
610
kill_pool_threads= true;
611
while (killed_threads != created_threads)
613
/* wake up the event loop */
615
assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
617
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
619
(void) pthread_mutex_unlock(&LOCK_thread_count);
621
event_del(&session_add_event);
622
close(session_add_pipe[0]);
623
close(session_add_pipe[1]);
624
event_del(&session_kill_event);
625
close(session_kill_pipe[0]);
626
close(session_kill_pipe[1]);
628
(void) pthread_mutex_destroy(&LOCK_event_loop);
629
(void) pthread_mutex_destroy(&LOCK_session_add);
634
void pool_of_threads_scheduler(scheduler_functions* func)
636
func->max_threads= thread_pool_size;
637
func->init= libevent_init;
638
func->end= libevent_end;
639
func->post_kill_notification= libevent_post_kill_notification;
640
func->add_connection= libevent_add_connection;