17
17
#include <drizzled/gettext.h>
18
18
#include <drizzled/error.h>
19
19
#include <drizzled/plugin/scheduler.h>
20
#include <drizzled/connect.h>
21
20
#include <drizzled/sql_parse.h>
22
21
#include <drizzled/session.h>
23
22
#include "session_scheduler.h"
97
97
safe_mutex_assert_owner(&LOCK_event_loop);
98
98
Session *session= (Session*)ctx;
99
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
99
session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
100
100
assert(scheduler);
101
101
sessions_waiting_for_io.remove(scheduler->session);
102
102
sessions_need_processing.push_front(scheduler->session);
138
138
Session *session= *it;
139
139
if (session->killed == Session::KILL_CONNECTION)
141
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
141
session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
142
142
assert(scheduler);
144
144
Delete from libevent and add to the processing queue.
183
183
/* pop the first session off the list */
184
184
Session* session= sessions_need_adding.front();
185
185
sessions_need_adding.pop_front();
186
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
186
session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
187
187
assert(scheduler);
189
189
pthread_mutex_unlock(&LOCK_session_add);
218
class Pool_of_threads_scheduler: public Scheduler
218
class PoolOfThreadsScheduler: public plugin::Scheduler
221
pthread_attr_t thread_attrib;
224
Pool_of_threads_scheduler(uint32_t max_size_in)
225
: Scheduler(max_size_in)
224
PoolOfThreadsScheduler(): Scheduler()
227
/* Parameter for threads created for connections */
228
(void) pthread_attr_init(&thread_attrib);
229
(void) pthread_attr_setdetachstate(&thread_attrib,
230
PTHREAD_CREATE_DETACHED);
231
pthread_attr_setscope(&thread_attrib, PTHREAD_SCOPE_SYSTEM);
233
struct sched_param tmp_sched_param;
235
memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
236
tmp_sched_param.sched_priority= WAIT_PRIOR;
237
(void)pthread_attr_setschedparam(&thread_attrib, &tmp_sched_param);
226
struct sched_param tmp_sched_param;
228
/* Setup attribute parameter for session threads. */
229
(void) pthread_attr_init(&attr);
230
(void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
231
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
232
memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
233
tmp_sched_param.sched_priority= WAIT_PRIOR;
234
(void) pthread_attr_setschedparam(&attr, &tmp_sched_param);
241
~Pool_of_threads_scheduler()
237
~PoolOfThreadsScheduler()
243
239
(void) pthread_mutex_lock(&LOCK_thread_count);
272
269
LOCK_thread_count is locked on entry. This function MUST unlock it!
275
virtual bool add_connection(Session *session)
272
virtual bool addSession(Session *session)
277
assert(session->scheduler == NULL);
274
assert(session->scheduler_arg == NULL);
278
275
session_scheduler *scheduler= new session_scheduler(session);
280
277
if (scheduler == NULL)
283
session->scheduler= (void *)scheduler;
280
session->scheduler_arg= (void *)scheduler;
285
282
libevent_session_add(session);
298
295
@param[in] session The connection to kill
301
virtual void post_kill_notification(Session *)
298
virtual void killSession(Session *)
304
301
Note, we just wake up libevent with an event that a Session should be killed,
376
368
pthread_t thread;
378
if ((error= pthread_create(&thread, &thread_attrib, libevent_thread_proc, 0)))
370
if ((error= pthread_create(&thread, &attr, libevent_thread_proc, 0)))
380
372
errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
397
class PoolOfThreadsFactory : public SchedulerFactory
389
class PoolOfThreadsFactory : public plugin::SchedulerFactory
400
392
PoolOfThreadsFactory() : SchedulerFactory("pool_of_threads") {}
401
393
~PoolOfThreadsFactory() { if (scheduler != NULL) delete scheduler; }
402
Scheduler *operator() ()
394
plugin::Scheduler *operator() ()
404
396
if (scheduler == NULL)
406
Pool_of_threads_scheduler *pot= new Pool_of_threads_scheduler(size);
398
PoolOfThreadsScheduler *pot= new PoolOfThreadsScheduler();
407
399
if (pot->libevent_init())
422
414
static void libevent_connection_close(Session *session)
424
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
416
session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
425
417
assert(scheduler);
426
418
session->killed= Session::KILL_CONNECTION; // Avoid error messages
495
487
/* pop the first session off the list */
496
488
session= sessions_need_processing.front();
497
489
sessions_need_processing.pop_front();
498
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
490
session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
500
492
(void) pthread_mutex_unlock(&LOCK_event_loop);
513
505
/* is the connection logged in yet? */
514
506
if (!scheduler->logged_in)
516
if (! session->authenticate())
508
if (session->authenticate())
518
510
/* Failed to log in */
519
511
libevent_connection_close(session);
563
555
static bool libevent_needs_immediate_processing(Session *session)
565
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
557
session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
567
559
if (libevent_should_close_connection(session))
597
589
void libevent_session_add(Session* session)
600
session_scheduler *scheduler= (session_scheduler *)session->scheduler;
592
session_scheduler *scheduler= (session_scheduler *)session->scheduler_arg;
601
593
assert(scheduler);
603
595
pthread_mutex_lock(&LOCK_session_add);