~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/scheduler.cc

  • Committer: Mark Atwood
  • Date: 2009-01-05 04:37:22 UTC
  • mto: (758.1.1 devel)
  • mto: This revision was merged to the branch mainline in revision 759.
  • Revision ID: me@mark.atwood.name-20090105043722-03l4mzcxi4yjjjih
replace sql_print_error etc with errmsg_print

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
 
20
20
#include <drizzled/server_includes.h>
21
21
#include <libdrizzle/libdrizzle.h>
22
 
#include "event.h"
23
 
 
 
22
#include <event.h>
 
23
#include <drizzled/gettext.h>
 
24
#include <drizzled/sql_parse.h>
 
25
#include <drizzled/scheduler.h>
 
26
#include <drizzled/session.h>
 
27
/* API for connecting, logging in to a drizzled server */
 
28
#include <drizzled/connect.h>
24
29
 
25
30
/*
26
31
  'Dummy' functions to be used when we don't need any handling for a scheduler
28
33
 */
29
34
 
30
35
static bool init_dummy(void) {return 0;}
31
 
static void post_kill_dummy(THD *thd __attribute__((unused))) {}
 
36
static void post_kill_dummy(Session *session __attribute__((unused))) {}
32
37
static void end_dummy(void) {}
33
 
static bool end_thread_dummy(THD *thd __attribute__((unused)),
 
38
static bool end_thread_dummy(Session *session __attribute__((unused)),
34
39
                             bool cache_thread __attribute__((unused)))
35
40
{ return 0; }
36
41
 
50
55
static uint32_t created_threads, killed_threads;
51
56
static bool kill_pool_threads;
52
57
 
53
 
static struct event thd_add_event;
54
 
static struct event thd_kill_event;
55
 
 
56
 
static pthread_mutex_t LOCK_thd_add;    /* protects thds_need_adding */
57
 
static LIST *thds_need_adding;    /* list of thds to add to libevent queue */
58
 
 
59
 
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
60
 
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
 
58
static struct event session_add_event;
 
59
static struct event session_kill_event;
 
60
 
 
61
static pthread_mutex_t LOCK_session_add;    /* protects sessions_need_adding */
 
62
static LIST *sessions_need_adding;    /* list of sessions to add to libevent queue */
 
63
 
 
64
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
 
65
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
61
66
 
62
67
/*
63
 
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
64
 
  event_del) and thds_need_processing and thds_waiting_for_io.
 
68
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and
 
69
  event_del) and sessions_need_processing and sessions_waiting_for_io.
65
70
*/
66
71
static pthread_mutex_t LOCK_event_loop;
67
 
static LIST *thds_need_processing; /* list of thds that needs some processing */
68
 
static LIST *thds_waiting_for_io; /* list of thds with added events */
 
72
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
 
73
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
69
74
 
70
75
pthread_handler_t libevent_thread_proc(void *arg);
71
76
static void libevent_end();
72
 
static bool libevent_needs_immediate_processing(THD *thd);
73
 
static void libevent_connection_close(THD *thd);
74
 
static bool libevent_should_close_connection(THD* thd);
75
 
static void libevent_thd_add(THD* thd);
 
77
static bool libevent_needs_immediate_processing(Session *session);
 
78
static void libevent_connection_close(Session *session);
 
79
static bool libevent_should_close_connection(Session* session);
 
80
static void libevent_session_add(Session* session);
76
81
void libevent_io_callback(int Fd, short Operation, void *ctx);
77
 
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
78
 
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
 
82
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
 
83
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
79
84
 
80
85
 
81
86
/*
95
100
 
96
101
 
97
102
/*
98
 
  thd_scheduler keeps the link between THD and events.
99
 
  It's embedded in the THD class.
 
103
  session_scheduler keeps the link between Session and events.
 
104
  It's embedded in the Session class.
100
105
*/
101
106
 
102
 
thd_scheduler::thd_scheduler()
103
 
  : logged_in(false), io_event(NULL), thread_attached(false)
104
 
{  
105
 
  dbug_explain_buf[0]= 0;
106
 
}
107
 
 
108
 
 
109
 
thd_scheduler::~thd_scheduler()
110
 
{
111
 
  free(io_event);
112
 
}
113
 
 
114
 
 
115
 
bool thd_scheduler::init(THD *parent_thd)
116
 
{
117
 
  io_event=
118
 
    (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
119
 
    
120
 
  if (!io_event)
 
107
session_scheduler::session_scheduler()
 
108
  : logged_in(false), io_event(NULL), thread_attached(false)
 
109
{
 
110
}
 
111
 
 
112
 
 
113
session_scheduler::~session_scheduler()
 
114
{
 
115
  delete io_event;
 
116
}
 
117
 
 
118
 
 
119
session_scheduler::session_scheduler(const session_scheduler&)
 
120
  : logged_in(false), io_event(NULL), thread_attached(false)
 
121
{}
 
122
 
 
123
void session_scheduler::operator=(const session_scheduler&)
 
124
{}
 
125
 
 
126
bool session_scheduler::init(Session *parent_session)
 
127
{
 
128
  io_event= new struct event;
 
129
 
 
130
  if (io_event == NULL)
121
131
  {
122
 
    sql_print_error(_("Memory allocation error in thd_scheduler::init\n"));
 
132
    errmsg_printf(ERRMSG_LVL_ERROR, _("Memory allocation error in session_scheduler::init\n"));
123
133
    return true;
124
134
  }
125
 
  
126
 
  event_set(io_event, net_get_sd(&(parent_thd->net)), EV_READ, 
127
 
            libevent_io_callback, (void*)parent_thd);
128
 
    
129
 
  list.data= parent_thd;
130
 
  
 
135
  memset(io_event, 0, sizeof(*io_event));
 
136
 
 
137
  event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ,
 
138
            libevent_io_callback, (void*)parent_session);
 
139
 
 
140
  list.data= parent_session;
 
141
 
131
142
  return false;
132
143
}
133
144
 
136
147
  Attach/associate the connection with the OS thread, for command processing.
137
148
*/
138
149
 
139
 
bool thd_scheduler::thread_attach()
 
150
bool session_scheduler::thread_attach()
140
151
{
141
152
  assert(!thread_attached);
142
 
  THD* thd = (THD*)list.data;
143
 
  if (libevent_should_close_connection(thd) ||
144
 
      setup_connection_thread_globals(thd))
 
153
  Session* session = (Session*)list.data;
 
154
  if (libevent_should_close_connection(session) ||
 
155
      setup_connection_thread_globals(session))
145
156
  {
146
157
    return true;
147
158
  }
148
159
  my_errno= 0;
149
 
  thd->mysys_var->abort= 0;
 
160
  session->mysys_var->abort= 0;
150
161
  thread_attached= true;
151
 
  swap_dbug_explain();
 
162
 
152
163
  return false;
153
164
}
154
165
 
157
168
  Detach/disassociate the connection with the OS thread.
158
169
*/
159
170
 
160
 
void thd_scheduler::thread_detach()
 
171
void session_scheduler::thread_detach()
161
172
{
162
173
  if (thread_attached)
163
174
  {
164
 
    THD* thd = (THD*)list.data;
165
 
    thd->mysys_var= NULL;
 
175
    Session* session = (Session*)list.data;
 
176
    session->mysys_var= NULL;
166
177
    thread_attached= false;
167
 
    swap_dbug_explain();
168
178
  }
169
179
}
170
180
 
171
 
 
172
 
/*
173
 
  Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
174
 
 
175
 
  This is used to preserve the SESSION DEBUG variable, which is mapped to the OS 
176
 
  thread during a command, but each command is handled by a different thread.
177
 
*/
178
 
void thd_scheduler::swap_dbug_explain()
179
 
{
180
 
  char buffer[sizeof(dbug_explain_buf)];
181
 
  memcpy(dbug_explain_buf, buffer, sizeof(buffer));
182
 
}
183
 
 
184
181
/**
185
182
  Create all threads for the thread pool
186
183
 
199
196
  uint32_t i;
200
197
 
201
198
  event_init();
202
 
  
 
199
 
203
200
  created_threads= 0;
204
201
  killed_threads= 0;
205
202
  kill_pool_threads= false;
206
 
  
 
203
 
207
204
  pthread_mutex_init(&LOCK_event_loop, NULL);
208
 
  pthread_mutex_init(&LOCK_thd_add, NULL);
209
 
  
210
 
  /* set up the pipe used to add new thds to the event pool */
211
 
  if (init_pipe(thd_add_pipe))
212
 
  {
213
 
    sql_print_error(_("init_pipe(thd_add_pipe) error in libevent_init\n"));
214
 
    return(1);
215
 
  }
216
 
  /* set up the pipe used to kill thds in the event queue */
217
 
  if (init_pipe(thd_kill_pipe))
218
 
  {
219
 
    sql_print_error(_("init_pipe(thd_kill_pipe) error in libevent_init\n"));
220
 
    close(thd_add_pipe[0]);
221
 
    close(thd_add_pipe[1]);
222
 
    return(1);
223
 
  }
224
 
  event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
225
 
            libevent_add_thd_callback, NULL);
226
 
  event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
227
 
            libevent_kill_thd_callback, NULL);
228
 
 
229
 
 if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
 
205
  pthread_mutex_init(&LOCK_session_add, NULL);
 
206
 
 
207
  /* set up the pipe used to add new sessions to the event pool */
 
208
  if (init_pipe(session_add_pipe))
 
209
  {
 
210
    errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_add_pipe) error in libevent_init\n"));
 
211
    return(1);
 
212
  }
 
213
  /* set up the pipe used to kill sessions in the event queue */
 
214
  if (init_pipe(session_kill_pipe))
 
215
  {
 
216
    errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_kill_pipe) error in libevent_init\n"));
 
217
    close(session_add_pipe[0]);
 
218
    close(session_add_pipe[1]);
 
219
    return(1);
 
220
  }
 
221
  event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
 
222
            libevent_add_session_callback, NULL);
 
223
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
 
224
            libevent_kill_session_callback, NULL);
 
225
 
 
226
 if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
230
227
 {
231
 
   sql_print_error(_("thd_add_event event_add error in libevent_init\n"));
 
228
   errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
232
229
   libevent_end();
233
230
   return(1);
234
 
   
 
231
 
235
232
 }
236
233
  /* Set up the thread pool */
237
234
  created_threads= killed_threads= 0;
244
241
    if ((error= pthread_create(&thread, &connection_attrib,
245
242
                               libevent_thread_proc, 0)))
246
243
    {
247
 
      sql_print_error(_("Can't create completion port thread (error %d)"),
 
244
      errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
248
245
                      error);
249
246
      pthread_mutex_unlock(&LOCK_thread_count);
250
247
      libevent_end();                      // Cleanup
256
253
  while (created_threads != thread_pool_size)
257
254
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
258
255
  pthread_mutex_unlock(&LOCK_thread_count);
259
 
  
 
256
 
260
257
  return(false);
261
258
}
262
259
 
263
260
 
264
261
/*
265
262
  This is called when data is ready on the socket.
266
 
  
 
263
 
267
264
  NOTES
268
265
    This is only called by the thread that owns LOCK_event_loop.
269
 
  
270
 
    We add the thd that got the data to thds_need_processing, and 
 
266
 
 
267
    We add the session that got the data to sessions_need_processing, and
271
268
    cause the libevent event_loop() to terminate. Then this same thread will
272
 
    return from event_loop and pick the thd value back up for processing.
 
269
    return from event_loop and pick the session value back up for processing.
273
270
*/
274
271
 
275
272
void libevent_io_callback(int, short, void *ctx)
276
 
{    
 
273
{
277
274
  safe_mutex_assert_owner(&LOCK_event_loop);
278
 
  THD *thd= (THD*)ctx;
279
 
  thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
280
 
  thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
 
275
  Session *session= (Session*)ctx;
 
276
  sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
 
277
  sessions_need_processing= list_add(sessions_need_processing, &session->scheduler.list);
281
278
}
282
279
 
283
280
/*
284
281
  This is called when we have a thread we want to be killed.
285
 
  
 
282
 
286
283
  NOTES
287
284
    This is only called by the thread that owns LOCK_event_loop.
288
285
*/
289
286
 
290
 
void libevent_kill_thd_callback(int Fd, short, void*)
291
 
{    
 
287
void libevent_kill_session_callback(int Fd, short, void*)
 
288
{
292
289
  safe_mutex_assert_owner(&LOCK_event_loop);
293
290
 
294
291
  /* clear the pending events */
296
293
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
297
294
  {}
298
295
 
299
 
  LIST* list= thds_waiting_for_io;
 
296
  LIST* list= sessions_waiting_for_io;
300
297
  while (list)
301
298
  {
302
 
    THD *thd= (THD*)list->data;
 
299
    Session *session= (Session*)list->data;
303
300
    list= list_rest(list);
304
 
    if (thd->killed == THD::KILL_CONNECTION)
 
301
    if (session->killed == Session::KILL_CONNECTION)
305
302
    {
306
303
      /*
307
304
        Delete from libevent and add to the processing queue.
308
305
      */
309
 
      event_del(thd->scheduler.io_event);
310
 
      thds_waiting_for_io= list_delete(thds_waiting_for_io,
311
 
                                       &thd->scheduler.list);
312
 
      thds_need_processing= list_add(thds_need_processing,
313
 
                                     &thd->scheduler.list);
 
306
      event_del(session->scheduler.io_event);
 
307
      sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
 
308
                                       &session->scheduler.list);
 
309
      sessions_need_processing= list_add(sessions_need_processing,
 
310
                                     &session->scheduler.list);
314
311
    }
315
312
  }
316
313
}
318
315
 
319
316
/*
320
317
  This is used to add connections to the pool. This callback is invoked from
321
 
  the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
 
318
  the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
322
319
  written to it.
323
 
  
 
320
 
324
321
  NOTES
325
322
    This is only called by the thread that owns LOCK_event_loop.
326
323
*/
327
324
 
328
 
void libevent_add_thd_callback(int Fd, short, void *)
329
 
 
325
void libevent_add_session_callback(int Fd, short, void *)
 
326
{
330
327
  safe_mutex_assert_owner(&LOCK_event_loop);
331
328
 
332
329
  /* clear the pending events */
334
331
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
335
332
  {}
336
333
 
337
 
  pthread_mutex_lock(&LOCK_thd_add);
338
 
  while (thds_need_adding)
 
334
  pthread_mutex_lock(&LOCK_session_add);
 
335
  while (sessions_need_adding)
339
336
  {
340
 
    /* pop the first thd off the list */
341
 
    THD* thd= (THD*)thds_need_adding->data;
342
 
    thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
343
 
 
344
 
    pthread_mutex_unlock(&LOCK_thd_add);
345
 
    
346
 
    if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
 
337
    /* pop the first session off the list */
 
338
    Session* session= (Session*)sessions_need_adding->data;
 
339
    sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
 
340
 
 
341
    pthread_mutex_unlock(&LOCK_session_add);
 
342
 
 
343
    if (!session->scheduler.logged_in || libevent_should_close_connection(session))
347
344
    {
348
345
      /*
349
 
        Add thd to thds_need_processing list. If it needs closing we'll close
 
346
        Add session to sessions_need_processing list. If it needs closing we'll close
350
347
        it outside of event_loop().
351
348
      */
352
 
      thds_need_processing= list_add(thds_need_processing,
353
 
                                     &thd->scheduler.list);
 
349
      sessions_need_processing= list_add(sessions_need_processing,
 
350
                                     &session->scheduler.list);
354
351
    }
355
352
    else
356
353
    {
357
354
      /* Add to libevent */
358
 
      if (event_add(thd->scheduler.io_event, NULL))
 
355
      if (event_add(session->scheduler.io_event, NULL))
359
356
      {
360
 
        sql_print_error(_("event_add error in libevent_add_thd_callback\n"));
361
 
        libevent_connection_close(thd);
362
 
      } 
 
357
        errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
 
358
        libevent_connection_close(session);
 
359
      }
363
360
      else
364
361
      {
365
 
        thds_waiting_for_io= list_add(thds_waiting_for_io,
366
 
                                      &thd->scheduler.list);
 
362
        sessions_waiting_for_io= list_add(sessions_waiting_for_io,
 
363
                                      &session->scheduler.list);
367
364
      }
368
365
    }
369
 
    pthread_mutex_lock(&LOCK_thd_add);
 
366
    pthread_mutex_lock(&LOCK_session_add);
370
367
  }
371
 
  pthread_mutex_unlock(&LOCK_thd_add);
 
368
  pthread_mutex_unlock(&LOCK_session_add);
372
369
}
373
370
 
374
371
 
379
376
    LOCK_thread_count is locked on entry. This function MUST unlock it!
380
377
*/
381
378
 
382
 
static void libevent_add_connection(THD *thd)
 
379
static void libevent_add_connection(Session *session)
383
380
{
384
 
  if (thd->scheduler.init(thd))
 
381
  if (session->scheduler.init(session))
385
382
  {
386
 
    sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
 
383
    errmsg_printf(ERRMSG_LVL_ERROR, _("Scheduler init error in libevent_add_new_connection\n"));
387
384
    pthread_mutex_unlock(&LOCK_thread_count);
388
 
    libevent_connection_close(thd);
 
385
    libevent_connection_close(session);
389
386
    return;
390
387
  }
391
 
  threads.append(thd);
392
 
  libevent_thd_add(thd);
393
 
  
 
388
  threads.append(session);
 
389
  libevent_session_add(session);
 
390
 
394
391
  pthread_mutex_unlock(&LOCK_thread_count);
395
392
  return;
396
393
}
398
395
 
399
396
/**
400
397
  @brief Signal a waiting connection it's time to die.
401
 
 
402
 
  @details This function will signal libevent the THD should be killed.
403
 
    Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
 
398
 
 
399
  @details This function will signal libevent the Session should be killed.
 
400
    Either the global LOCK_session_count or the Session's LOCK_delete must be locked
404
401
    upon entry.
405
 
 
406
 
  @param[in]  thd The connection to kill
 
402
 
 
403
  @param[in]  session The connection to kill
407
404
*/
408
405
 
409
 
static void libevent_post_kill_notification(THD *)
 
406
static void libevent_post_kill_notification(Session *)
410
407
{
411
408
  /*
412
 
    Note, we just wake up libevent with an event that a THD should be killed,
413
 
    It will search its list of thds for thd->killed ==  KILL_CONNECTION to
414
 
    find the THDs it should kill.
415
 
    
 
409
    Note, we just wake up libevent with an event that a Session should be killed,
 
410
    It will search its list of sessions for session->killed ==  KILL_CONNECTION to
 
411
    find the Sessions it should kill.
 
412
 
416
413
    So we don't actually tell it which one and we don't actually use the
417
 
    THD being passed to us, but that's just a design detail that could change
 
414
    Session being passed to us, but that's just a design detail that could change
418
415
    later.
419
416
  */
420
417
  char c= 0;
421
 
  write(thd_kill_pipe[1], &c, sizeof(c));
 
418
  assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
422
419
}
423
420
 
424
421
 
426
423
  Close and delete a connection.
427
424
*/
428
425
 
429
 
static void libevent_connection_close(THD *thd)
 
426
static void libevent_connection_close(Session *session)
430
427
{
431
 
  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
 
428
  session->killed= Session::KILL_CONNECTION;          // Avoid error messages
432
429
 
433
 
  if (net_get_sd(&(thd->net)) >= 0)                  // not already closed
 
430
  if (net_get_sd(&(session->net)) >= 0)                  // not already closed
434
431
  {
435
 
    end_connection(thd);
436
 
    close_connection(thd, 0, 1);
 
432
    end_connection(session);
 
433
    session->close_connection(0, 1);
437
434
  }
438
 
  thd->scheduler.thread_detach();
439
 
  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
 
435
  session->scheduler.thread_detach();
 
436
  unlink_session(session);   /* locks LOCK_thread_count and deletes session */
440
437
  pthread_mutex_unlock(&LOCK_thread_count);
441
438
 
442
439
  return;
444
441
 
445
442
 
446
443
/*
447
 
  Returns true if we should close and delete a THD connection.
 
444
  Returns true if we should close and delete a Session connection.
448
445
*/
449
446
 
450
 
static bool libevent_should_close_connection(THD* thd)
 
447
static bool libevent_should_close_connection(Session* session)
451
448
{
452
 
  return net_should_close(&(thd->net)) ||
453
 
         thd->killed == THD::KILL_CONNECTION;
 
449
  return net_should_close(&(session->net)) ||
 
450
         session->killed == Session::KILL_CONNECTION;
454
451
}
455
452
 
456
453
 
464
461
  if (init_new_connection_handler_thread())
465
462
  {
466
463
    my_thread_global_end();
467
 
    sql_print_error(_("libevent_thread_proc: my_thread_init() failed\n"));
 
464
    errmsg_printf(ERRMSG_LVL_ERROR, _("libevent_thread_proc: my_thread_init() failed\n"));
468
465
    exit(1);
469
466
  }
470
467
 
477
474
  if (created_threads == thread_pool_size)
478
475
    (void) pthread_cond_signal(&COND_thread_count);
479
476
  (void) pthread_mutex_unlock(&LOCK_thread_count);
480
 
  
 
477
 
481
478
  for (;;)
482
479
  {
483
 
    THD *thd= NULL;
 
480
    Session *session= NULL;
484
481
    (void) pthread_mutex_lock(&LOCK_event_loop);
485
 
    
486
 
    /* get thd(s) to process */
487
 
    while (!thds_need_processing)
 
482
 
 
483
    /* get session(s) to process */
 
484
    while (!sessions_need_processing)
488
485
    {
489
486
      if (kill_pool_threads)
490
487
      {
494
491
      }
495
492
      event_loop(EVLOOP_ONCE);
496
493
    }
497
 
    
498
 
    /* pop the first thd off the list */
499
 
    thd= (THD*)thds_need_processing->data;
500
 
    thds_need_processing= list_delete(thds_need_processing,
501
 
                                      thds_need_processing);
502
 
    
 
494
 
 
495
    /* pop the first session off the list */
 
496
    session= (Session*)sessions_need_processing->data;
 
497
    sessions_need_processing= list_delete(sessions_need_processing,
 
498
                                      sessions_need_processing);
 
499
 
503
500
    (void) pthread_mutex_unlock(&LOCK_event_loop);
504
 
    
505
 
    /* now we process the connection (thd) */
506
 
    
507
 
    /* set up the thd<->thread links. */
508
 
    thd->thread_stack= (char*) &thd;
509
 
    
510
 
    if (thd->scheduler.thread_attach())
 
501
 
 
502
    /* now we process the connection (session) */
 
503
 
 
504
    /* set up the session<->thread links. */
 
505
    session->thread_stack= (char*) &session;
 
506
 
 
507
    if (session->scheduler.thread_attach())
511
508
    {
512
 
      libevent_connection_close(thd);
 
509
      libevent_connection_close(session);
513
510
      continue;
514
511
    }
515
512
 
516
513
    /* is the connection logged in yet? */
517
 
    if (!thd->scheduler.logged_in)
 
514
    if (!session->scheduler.logged_in)
518
515
    {
519
 
      if (login_connection(thd))
 
516
      if (login_connection(session))
520
517
      {
521
518
        /* Failed to log in */
522
 
        libevent_connection_close(thd);
 
519
        libevent_connection_close(session);
523
520
        continue;
524
521
      }
525
522
      else
526
523
      {
527
524
        /* login successful */
528
 
        thd->scheduler.logged_in= true;
529
 
        prepare_new_connection_state(thd);
530
 
        if (!libevent_needs_immediate_processing(thd))
 
525
        session->scheduler.logged_in= true;
 
526
        prepare_new_connection_state(session);
 
527
        if (!libevent_needs_immediate_processing(session))
531
528
          continue; /* New connection is now waiting for data in libevent*/
532
529
      }
533
530
    }
535
532
    do
536
533
    {
537
534
      /* Process a query */
538
 
      if (do_command(thd))
 
535
      if (do_command(session))
539
536
      {
540
 
        libevent_connection_close(thd);
 
537
        libevent_connection_close(session);
541
538
        break;
542
539
      }
543
 
    } while (libevent_needs_immediate_processing(thd));
 
540
    } while (libevent_needs_immediate_processing(session));
544
541
  }
545
 
  
 
542
 
546
543
thread_exit:
547
544
  (void) pthread_mutex_lock(&LOCK_thread_count);
548
545
  killed_threads++;
555
552
 
556
553
 
557
554
/*
558
 
  Returns true if the connection needs immediate processing and false if 
 
555
  Returns true if the connection needs immediate processing and false if
559
556
  instead it's queued for libevent processing or closed,
560
557
*/
561
558
 
562
 
static bool libevent_needs_immediate_processing(THD *thd)
 
559
static bool libevent_needs_immediate_processing(Session *session)
563
560
{
564
 
  if (libevent_should_close_connection(thd))
 
561
  if (libevent_should_close_connection(session))
565
562
  {
566
 
    libevent_connection_close(thd);
 
563
    libevent_connection_close(session);
567
564
    return false;
568
565
  }
569
566
  /*
572
569
    Note: we cannot add for event processing because the whole request might
573
570
    already be buffered and we wouldn't receive an event.
574
571
  */
575
 
  if (net_more_data(&(thd->net)))
 
572
  if (net_more_data(&(session->net)))
576
573
    return true;
577
 
  
578
 
  thd->scheduler.thread_detach();
579
 
  libevent_thd_add(thd);
 
574
 
 
575
  session->scheduler.thread_detach();
 
576
  libevent_session_add(session);
580
577
  return false;
581
578
}
582
579
 
583
580
 
584
581
/*
585
 
  Adds a THD to queued for libevent processing.
586
 
  
 
582
  Adds a Session to queued for libevent processing.
 
583
 
587
584
  This call does not actually register the event with libevent.
588
 
  Instead, it places the THD onto a queue and signals libevent by writing
589
 
  a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
590
 
  be invoked which will find the THD on the queue and add it to libevent.
 
585
  Instead, it places the Session onto a queue and signals libevent by writing
 
586
  a byte into session_add_pipe, which will cause our libevent_add_session_callback to
 
587
  be invoked which will find the Session on the queue and add it to libevent.
591
588
*/
592
589
 
593
 
static void libevent_thd_add(THD* thd)
 
590
static void libevent_session_add(Session* session)
594
591
{
595
592
  char c=0;
596
 
  pthread_mutex_lock(&LOCK_thd_add);
 
593
  pthread_mutex_lock(&LOCK_session_add);
597
594
  /* queue for libevent */
598
 
  thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
 
595
  sessions_need_adding= list_add(sessions_need_adding, &session->scheduler.list);
599
596
  /* notify libevent */
600
 
  write(thd_add_pipe[1], &c, sizeof(c));
601
 
  pthread_mutex_unlock(&LOCK_thd_add);
 
597
  assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
 
598
  pthread_mutex_unlock(&LOCK_session_add);
602
599
}
603
600
 
604
601
 
609
606
static void libevent_end()
610
607
{
611
608
  (void) pthread_mutex_lock(&LOCK_thread_count);
612
 
  
 
609
 
613
610
  kill_pool_threads= true;
614
611
  while (killed_threads != created_threads)
615
612
  {
616
613
    /* wake up the event loop */
617
614
    char c= 0;
618
 
    write(thd_add_pipe[1], &c, sizeof(c));
 
615
    assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
619
616
 
620
617
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
621
618
  }
622
619
  (void) pthread_mutex_unlock(&LOCK_thread_count);
623
 
  
624
 
  event_del(&thd_add_event);
625
 
  close(thd_add_pipe[0]);
626
 
  close(thd_add_pipe[1]);
627
 
  event_del(&thd_kill_event);
628
 
  close(thd_kill_pipe[0]);
629
 
  close(thd_kill_pipe[1]);
 
620
 
 
621
  event_del(&session_add_event);
 
622
  close(session_add_pipe[0]);
 
623
  close(session_add_pipe[1]);
 
624
  event_del(&session_kill_event);
 
625
  close(session_kill_pipe[0]);
 
626
  close(session_kill_pipe[1]);
630
627
 
631
628
  (void) pthread_mutex_destroy(&LOCK_event_loop);
632
 
  (void) pthread_mutex_destroy(&LOCK_thd_add);
 
629
  (void) pthread_mutex_destroy(&LOCK_session_add);
633
630
  return;
634
631
}
635
632