~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/scheduler.cc

  • Committer: Stewart Smith
  • Date: 2009-01-12 05:43:13 UTC
  • mto: (784.1.4 for-brian)
  • mto: This revision was merged to the branch mainline in revision 785.
  • Revision ID: stewart@flamingspork.com-20090112054313-edk6kpf4l6kpz4j7
fix archive_basic for drizzle

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
*/
19
19
 
20
20
#include <drizzled/server_includes.h>
21
 
#include "event.h"
22
 
 
 
21
#include <libdrizzle/libdrizzle.h>
 
22
#include <event.h>
 
23
#include <drizzled/gettext.h>
 
24
#include <drizzled/sql_parse.h>
 
25
#include <drizzled/scheduler.h>
 
26
#include <drizzled/session.h>
 
27
/* API for connecting, logging in to a drizzled server */
 
28
#include <drizzled/connect.h>
23
29
 
24
30
/*
25
31
  'Dummy' functions to be used when we don't need any handling for a scheduler
27
33
 */
28
34
 
29
35
static bool init_dummy(void) {return 0;}
30
 
static void post_kill_dummy(THD *thd __attribute__((unused))) {}
 
36
static void post_kill_dummy(Session *session __attribute__((unused))) {}
31
37
static void end_dummy(void) {}
32
 
static bool end_thread_dummy(THD *thd __attribute__((unused)),
 
38
static bool end_thread_dummy(Session *session __attribute__((unused)),
33
39
                             bool cache_thread __attribute__((unused)))
34
40
{ return 0; }
35
41
 
46
52
   end_thread(end_thread_dummy), end(end_dummy)
47
53
{}
48
54
 
49
 
static uint created_threads, killed_threads;
 
55
static uint32_t created_threads, killed_threads;
50
56
static bool kill_pool_threads;
51
57
 
52
 
static struct event thd_add_event;
53
 
static struct event thd_kill_event;
54
 
 
55
 
static pthread_mutex_t LOCK_thd_add;    /* protects thds_need_adding */
56
 
static LIST *thds_need_adding;    /* list of thds to add to libevent queue */
57
 
 
58
 
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
59
 
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
 
58
static struct event session_add_event;
 
59
static struct event session_kill_event;
 
60
 
 
61
static pthread_mutex_t LOCK_session_add;    /* protects sessions_need_adding */
 
62
static LIST *sessions_need_adding;    /* list of sessions to add to libevent queue */
 
63
 
 
64
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
 
65
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
60
66
 
61
67
/*
62
 
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
63
 
  event_del) and thds_need_processing and thds_waiting_for_io.
 
68
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and
 
69
  event_del) and sessions_need_processing and sessions_waiting_for_io.
64
70
*/
65
71
static pthread_mutex_t LOCK_event_loop;
66
 
static LIST *thds_need_processing; /* list of thds that needs some processing */
67
 
static LIST *thds_waiting_for_io; /* list of thds with added events */
 
72
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
 
73
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
68
74
 
69
75
pthread_handler_t libevent_thread_proc(void *arg);
70
76
static void libevent_end();
71
 
static bool libevent_needs_immediate_processing(THD *thd);
72
 
static void libevent_connection_close(THD *thd);
73
 
static bool libevent_should_close_connection(THD* thd);
74
 
static void libevent_thd_add(THD* thd);
 
77
static bool libevent_needs_immediate_processing(Session *session);
 
78
static void libevent_connection_close(Session *session);
 
79
static bool libevent_should_close_connection(Session* session);
 
80
static void libevent_session_add(Session* session);
75
81
void libevent_io_callback(int Fd, short Operation, void *ctx);
76
 
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
77
 
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
 
82
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
 
83
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
78
84
 
79
85
 
80
86
/*
94
100
 
95
101
 
96
102
/*
97
 
  thd_scheduler keeps the link between THD and events.
98
 
  It's embedded in the THD class.
 
103
  session_scheduler keeps the link between Session and events.
 
104
  It's embedded in the Session class.
99
105
*/
100
106
 
101
 
thd_scheduler::thd_scheduler()
102
 
  : logged_in(false), io_event(NULL), thread_attached(false)
103
 
{  
104
 
  dbug_explain_buf[0]= 0;
105
 
}
106
 
 
107
 
 
108
 
thd_scheduler::~thd_scheduler()
109
 
{
110
 
  my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
111
 
}
112
 
 
113
 
 
114
 
bool thd_scheduler::init(THD *parent_thd)
115
 
{
116
 
  io_event=
117
 
    (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
118
 
    
119
 
  if (!io_event)
 
107
session_scheduler::session_scheduler()
 
108
  : logged_in(false), io_event(NULL), thread_attached(false)
 
109
{
 
110
}
 
111
 
 
112
 
 
113
session_scheduler::~session_scheduler()
 
114
{
 
115
  delete io_event;
 
116
}
 
117
 
 
118
 
 
119
session_scheduler::session_scheduler(const session_scheduler&)
 
120
  : logged_in(false), io_event(NULL), thread_attached(false)
 
121
{}
 
122
 
 
123
void session_scheduler::operator=(const session_scheduler&)
 
124
{}
 
125
 
 
126
bool session_scheduler::init(Session *parent_session)
 
127
{
 
128
  io_event= new struct event;
 
129
 
 
130
  if (io_event == NULL)
120
131
  {
121
 
    sql_print_error("Memory allocation error in thd_scheduler::init\n");
 
132
    errmsg_printf(ERRMSG_LVL_ERROR, _("Memory allocation error in session_scheduler::init\n"));
122
133
    return true;
123
134
  }
124
 
  
125
 
  event_set(io_event, parent_thd->net.vio->sd, EV_READ, 
126
 
            libevent_io_callback, (void*)parent_thd);
127
 
    
128
 
  list.data= parent_thd;
129
 
  
 
135
  memset(io_event, 0, sizeof(*io_event));
 
136
 
 
137
  event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ,
 
138
            libevent_io_callback, (void*)parent_session);
 
139
 
 
140
  list.data= parent_session;
 
141
 
130
142
  return false;
131
143
}
132
144
 
135
147
  Attach/associate the connection with the OS thread, for command processing.
136
148
*/
137
149
 
138
 
bool thd_scheduler::thread_attach()
 
150
bool session_scheduler::thread_attach()
139
151
{
140
152
  assert(!thread_attached);
141
 
  THD* thd = (THD*)list.data;
142
 
  if (libevent_should_close_connection(thd) ||
143
 
      setup_connection_thread_globals(thd))
 
153
  Session* session = (Session*)list.data;
 
154
  if (libevent_should_close_connection(session) ||
 
155
      setup_connection_thread_globals(session))
144
156
  {
145
157
    return true;
146
158
  }
147
159
  my_errno= 0;
148
 
  thd->mysys_var->abort= 0;
 
160
  session->mysys_var->abort= 0;
149
161
  thread_attached= true;
150
 
  swap_dbug_explain();
 
162
 
151
163
  return false;
152
164
}
153
165
 
156
168
  Detach/disassociate the connection with the OS thread.
157
169
*/
158
170
 
159
 
void thd_scheduler::thread_detach()
 
171
void session_scheduler::thread_detach()
160
172
{
161
173
  if (thread_attached)
162
174
  {
163
 
    THD* thd = (THD*)list.data;
164
 
    thd->mysys_var= NULL;
 
175
    Session* session = (Session*)list.data;
 
176
    session->mysys_var= NULL;
165
177
    thread_attached= false;
166
 
    swap_dbug_explain();
167
178
  }
168
179
}
169
180
 
170
 
 
171
 
/*
172
 
  Swap the THD's dbug explain_buffer with the OS thread's dbug explain buffer.
173
 
 
174
 
  This is used to preserve the SESSION DEBUG variable, which is mapped to the OS 
175
 
  thread during a command, but each command is handled by a different thread.
176
 
*/
177
 
void thd_scheduler::swap_dbug_explain()
178
 
{
179
 
  char buffer[sizeof(dbug_explain_buf)];
180
 
  memcpy(dbug_explain_buf, buffer, sizeof(buffer));
181
 
}
182
 
 
183
181
/**
184
182
  Create all threads for the thread pool
185
183
 
195
193
 
196
194
static bool libevent_init(void)
197
195
{
198
 
  uint i;
 
196
  uint32_t i;
199
197
 
200
198
  event_init();
201
 
  
 
199
 
202
200
  created_threads= 0;
203
201
  killed_threads= 0;
204
202
  kill_pool_threads= false;
205
 
  
 
203
 
206
204
  pthread_mutex_init(&LOCK_event_loop, NULL);
207
 
  pthread_mutex_init(&LOCK_thd_add, NULL);
208
 
  
209
 
  /* set up the pipe used to add new thds to the event pool */
210
 
  if (init_pipe(thd_add_pipe))
211
 
  {
212
 
    sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
213
 
    return(1);
214
 
  }
215
 
  /* set up the pipe used to kill thds in the event queue */
216
 
  if (init_pipe(thd_kill_pipe))
217
 
  {
218
 
    sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
219
 
    close(thd_add_pipe[0]);
220
 
    close(thd_add_pipe[1]);
221
 
    return(1);
222
 
  }
223
 
  event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
224
 
            libevent_add_thd_callback, NULL);
225
 
  event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
226
 
            libevent_kill_thd_callback, NULL);
227
 
 
228
 
 if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
 
205
  pthread_mutex_init(&LOCK_session_add, NULL);
 
206
 
 
207
  /* set up the pipe used to add new sessions to the event pool */
 
208
  if (init_pipe(session_add_pipe))
 
209
  {
 
210
    errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_add_pipe) error in libevent_init\n"));
 
211
    return(1);
 
212
  }
 
213
  /* set up the pipe used to kill sessions in the event queue */
 
214
  if (init_pipe(session_kill_pipe))
 
215
  {
 
216
    errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_kill_pipe) error in libevent_init\n"));
 
217
    close(session_add_pipe[0]);
 
218
    close(session_add_pipe[1]);
 
219
    return(1);
 
220
  }
 
221
  event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
 
222
            libevent_add_session_callback, NULL);
 
223
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
 
224
            libevent_kill_session_callback, NULL);
 
225
 
 
226
 if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
229
227
 {
230
 
   sql_print_error("thd_add_event event_add error in libevent_init\n");
 
228
   errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
231
229
   libevent_end();
232
230
   return(1);
233
 
   
 
231
 
234
232
 }
235
233
  /* Set up the thread pool */
236
234
  created_threads= killed_threads= 0;
243
241
    if ((error= pthread_create(&thread, &connection_attrib,
244
242
                               libevent_thread_proc, 0)))
245
243
    {
246
 
      sql_print_error("Can't create completion port thread (error %d)",
 
244
      errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
247
245
                      error);
248
246
      pthread_mutex_unlock(&LOCK_thread_count);
249
247
      libevent_end();                      // Cleanup
255
253
  while (created_threads != thread_pool_size)
256
254
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
257
255
  pthread_mutex_unlock(&LOCK_thread_count);
258
 
  
 
256
 
259
257
  return(false);
260
258
}
261
259
 
262
260
 
263
261
/*
264
262
  This is called when data is ready on the socket.
265
 
  
 
263
 
266
264
  NOTES
267
265
    This is only called by the thread that owns LOCK_event_loop.
268
 
  
269
 
    We add the thd that got the data to thds_need_processing, and 
 
266
 
 
267
    We add the session that got the data to sessions_need_processing, and
270
268
    cause the libevent event_loop() to terminate. Then this same thread will
271
 
    return from event_loop and pick the thd value back up for processing.
 
269
    return from event_loop and pick the session value back up for processing.
272
270
*/
273
271
 
274
272
void libevent_io_callback(int, short, void *ctx)
275
 
{    
 
273
{
276
274
  safe_mutex_assert_owner(&LOCK_event_loop);
277
 
  THD *thd= (THD*)ctx;
278
 
  thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
279
 
  thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
 
275
  Session *session= (Session*)ctx;
 
276
  sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
 
277
  sessions_need_processing= list_add(sessions_need_processing, &session->scheduler.list);
280
278
}
281
279
 
282
280
/*
283
281
  This is called when we have a thread we want to be killed.
284
 
  
 
282
 
285
283
  NOTES
286
284
    This is only called by the thread that owns LOCK_event_loop.
287
285
*/
288
286
 
289
 
void libevent_kill_thd_callback(int Fd, short, void*)
290
 
{    
 
287
void libevent_kill_session_callback(int Fd, short, void*)
 
288
{
291
289
  safe_mutex_assert_owner(&LOCK_event_loop);
292
290
 
293
291
  /* clear the pending events */
295
293
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
296
294
  {}
297
295
 
298
 
  LIST* list= thds_waiting_for_io;
 
296
  LIST* list= sessions_waiting_for_io;
299
297
  while (list)
300
298
  {
301
 
    THD *thd= (THD*)list->data;
 
299
    Session *session= (Session*)list->data;
302
300
    list= list_rest(list);
303
 
    if (thd->killed == THD::KILL_CONNECTION)
 
301
    if (session->killed == Session::KILL_CONNECTION)
304
302
    {
305
303
      /*
306
304
        Delete from libevent and add to the processing queue.
307
305
      */
308
 
      event_del(thd->scheduler.io_event);
309
 
      thds_waiting_for_io= list_delete(thds_waiting_for_io,
310
 
                                       &thd->scheduler.list);
311
 
      thds_need_processing= list_add(thds_need_processing,
312
 
                                     &thd->scheduler.list);
 
306
      event_del(session->scheduler.io_event);
 
307
      sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
 
308
                                       &session->scheduler.list);
 
309
      sessions_need_processing= list_add(sessions_need_processing,
 
310
                                     &session->scheduler.list);
313
311
    }
314
312
  }
315
313
}
317
315
 
318
316
/*
319
317
  This is used to add connections to the pool. This callback is invoked from
320
 
  the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
 
318
  the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
321
319
  written to it.
322
 
  
 
320
 
323
321
  NOTES
324
322
    This is only called by the thread that owns LOCK_event_loop.
325
323
*/
326
324
 
327
 
void libevent_add_thd_callback(int Fd, short, void *)
328
 
 
325
void libevent_add_session_callback(int Fd, short, void *)
 
326
{
329
327
  safe_mutex_assert_owner(&LOCK_event_loop);
330
328
 
331
329
  /* clear the pending events */
333
331
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
334
332
  {}
335
333
 
336
 
  pthread_mutex_lock(&LOCK_thd_add);
337
 
  while (thds_need_adding)
 
334
  pthread_mutex_lock(&LOCK_session_add);
 
335
  while (sessions_need_adding)
338
336
  {
339
 
    /* pop the first thd off the list */
340
 
    THD* thd= (THD*)thds_need_adding->data;
341
 
    thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
342
 
 
343
 
    pthread_mutex_unlock(&LOCK_thd_add);
344
 
    
345
 
    if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
 
337
    /* pop the first session off the list */
 
338
    Session* session= (Session*)sessions_need_adding->data;
 
339
    sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
 
340
 
 
341
    pthread_mutex_unlock(&LOCK_session_add);
 
342
 
 
343
    if (!session->scheduler.logged_in || libevent_should_close_connection(session))
346
344
    {
347
345
      /*
348
 
        Add thd to thds_need_processing list. If it needs closing we'll close
 
346
        Add session to sessions_need_processing list. If it needs closing we'll close
349
347
        it outside of event_loop().
350
348
      */
351
 
      thds_need_processing= list_add(thds_need_processing,
352
 
                                     &thd->scheduler.list);
 
349
      sessions_need_processing= list_add(sessions_need_processing,
 
350
                                     &session->scheduler.list);
353
351
    }
354
352
    else
355
353
    {
356
354
      /* Add to libevent */
357
 
      if (event_add(thd->scheduler.io_event, NULL))
 
355
      if (event_add(session->scheduler.io_event, NULL))
358
356
      {
359
 
        sql_print_error("event_add error in libevent_add_thd_callback\n");
360
 
        libevent_connection_close(thd);
361
 
      } 
 
357
        errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
 
358
        libevent_connection_close(session);
 
359
      }
362
360
      else
363
361
      {
364
 
        thds_waiting_for_io= list_add(thds_waiting_for_io,
365
 
                                      &thd->scheduler.list);
 
362
        sessions_waiting_for_io= list_add(sessions_waiting_for_io,
 
363
                                      &session->scheduler.list);
366
364
      }
367
365
    }
368
 
    pthread_mutex_lock(&LOCK_thd_add);
 
366
    pthread_mutex_lock(&LOCK_session_add);
369
367
  }
370
 
  pthread_mutex_unlock(&LOCK_thd_add);
 
368
  pthread_mutex_unlock(&LOCK_session_add);
371
369
}
372
370
 
373
371
 
378
376
    LOCK_thread_count is locked on entry. This function MUST unlock it!
379
377
*/
380
378
 
381
 
static void libevent_add_connection(THD *thd)
 
379
static void libevent_add_connection(Session *session)
382
380
{
383
 
  if (thd->scheduler.init(thd))
 
381
  if (session->scheduler.init(session))
384
382
  {
385
 
    sql_print_error("Scheduler init error in libevent_add_new_connection\n");
 
383
    errmsg_printf(ERRMSG_LVL_ERROR, _("Scheduler init error in libevent_add_new_connection\n"));
386
384
    pthread_mutex_unlock(&LOCK_thread_count);
387
 
    libevent_connection_close(thd);
 
385
    libevent_connection_close(session);
388
386
    return;
389
387
  }
390
 
  threads.append(thd);
391
 
  libevent_thd_add(thd);
392
 
  
 
388
  threads.append(session);
 
389
  libevent_session_add(session);
 
390
 
393
391
  pthread_mutex_unlock(&LOCK_thread_count);
394
392
  return;
395
393
}
397
395
 
398
396
/**
399
397
  @brief Signal a waiting connection it's time to die.
400
 
 
401
 
  @details This function will signal libevent the THD should be killed.
402
 
    Either the global LOCK_thd_count or the THD's LOCK_delete must be locked
 
398
 
 
399
  @details This function will signal libevent the Session should be killed.
 
400
    Either the global LOCK_session_count or the Session's LOCK_delete must be locked
403
401
    upon entry.
404
 
 
405
 
  @param[in]  thd The connection to kill
 
402
 
 
403
  @param[in]  session The connection to kill
406
404
*/
407
405
 
408
 
static void libevent_post_kill_notification(THD *)
 
406
static void libevent_post_kill_notification(Session *)
409
407
{
410
408
  /*
411
 
    Note, we just wake up libevent with an event that a THD should be killed,
412
 
    It will search its list of thds for thd->killed ==  KILL_CONNECTION to
413
 
    find the THDs it should kill.
414
 
    
 
409
    Note, we just wake up libevent with an event that a Session should be killed,
 
410
    It will search its list of sessions for session->killed ==  KILL_CONNECTION to
 
411
    find the Sessions it should kill.
 
412
 
415
413
    So we don't actually tell it which one and we don't actually use the
416
 
    THD being passed to us, but that's just a design detail that could change
 
414
    Session being passed to us, but that's just a design detail that could change
417
415
    later.
418
416
  */
419
417
  char c= 0;
420
 
  write(thd_kill_pipe[1], &c, sizeof(c));
 
418
  assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
421
419
}
422
420
 
423
421
 
425
423
  Close and delete a connection.
426
424
*/
427
425
 
428
 
static void libevent_connection_close(THD *thd)
 
426
static void libevent_connection_close(Session *session)
429
427
{
430
 
  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
 
428
  session->killed= Session::KILL_CONNECTION;          // Avoid error messages
431
429
 
432
 
  if (thd->net.vio->sd >= 0)                  // not already closed
 
430
  if (net_get_sd(&(session->net)) >= 0)                  // not already closed
433
431
  {
434
 
    end_connection(thd);
435
 
    close_connection(thd, 0, 1);
 
432
    end_connection(session);
 
433
    session->close_connection(0, 1);
436
434
  }
437
 
  thd->scheduler.thread_detach();
438
 
  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
 
435
  session->scheduler.thread_detach();
 
436
  unlink_session(session);   /* locks LOCK_thread_count and deletes session */
439
437
  pthread_mutex_unlock(&LOCK_thread_count);
440
438
 
441
439
  return;
443
441
 
444
442
 
445
443
/*
446
 
  Returns true if we should close and delete a THD connection.
 
444
  Returns true if we should close and delete a Session connection.
447
445
*/
448
446
 
449
 
static bool libevent_should_close_connection(THD* thd)
 
447
static bool libevent_should_close_connection(Session* session)
450
448
{
451
 
  return thd->net.error ||
452
 
         thd->net.vio == 0 ||
453
 
         thd->killed == THD::KILL_CONNECTION;
 
449
  return net_should_close(&(session->net)) ||
 
450
         session->killed == Session::KILL_CONNECTION;
454
451
}
455
452
 
456
453
 
464
461
  if (init_new_connection_handler_thread())
465
462
  {
466
463
    my_thread_global_end();
467
 
    sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
 
464
    errmsg_printf(ERRMSG_LVL_ERROR, _("libevent_thread_proc: my_thread_init() failed\n"));
468
465
    exit(1);
469
466
  }
470
467
 
477
474
  if (created_threads == thread_pool_size)
478
475
    (void) pthread_cond_signal(&COND_thread_count);
479
476
  (void) pthread_mutex_unlock(&LOCK_thread_count);
480
 
  
 
477
 
481
478
  for (;;)
482
479
  {
483
 
    THD *thd= NULL;
 
480
    Session *session= NULL;
484
481
    (void) pthread_mutex_lock(&LOCK_event_loop);
485
 
    
486
 
    /* get thd(s) to process */
487
 
    while (!thds_need_processing)
 
482
 
 
483
    /* get session(s) to process */
 
484
    while (!sessions_need_processing)
488
485
    {
489
486
      if (kill_pool_threads)
490
487
      {
494
491
      }
495
492
      event_loop(EVLOOP_ONCE);
496
493
    }
497
 
    
498
 
    /* pop the first thd off the list */
499
 
    thd= (THD*)thds_need_processing->data;
500
 
    thds_need_processing= list_delete(thds_need_processing,
501
 
                                      thds_need_processing);
502
 
    
 
494
 
 
495
    /* pop the first session off the list */
 
496
    session= (Session*)sessions_need_processing->data;
 
497
    sessions_need_processing= list_delete(sessions_need_processing,
 
498
                                      sessions_need_processing);
 
499
 
503
500
    (void) pthread_mutex_unlock(&LOCK_event_loop);
504
 
    
505
 
    /* now we process the connection (thd) */
506
 
    
507
 
    /* set up the thd<->thread links. */
508
 
    thd->thread_stack= (char*) &thd;
509
 
    
510
 
    if (thd->scheduler.thread_attach())
 
501
 
 
502
    /* now we process the connection (session) */
 
503
 
 
504
    /* set up the session<->thread links. */
 
505
    session->thread_stack= (char*) &session;
 
506
 
 
507
    if (session->scheduler.thread_attach())
511
508
    {
512
 
      libevent_connection_close(thd);
 
509
      libevent_connection_close(session);
513
510
      continue;
514
511
    }
515
512
 
516
513
    /* is the connection logged in yet? */
517
 
    if (!thd->scheduler.logged_in)
 
514
    if (!session->scheduler.logged_in)
518
515
    {
519
 
      if (login_connection(thd))
 
516
      if (login_connection(session))
520
517
      {
521
518
        /* Failed to log in */
522
 
        libevent_connection_close(thd);
 
519
        libevent_connection_close(session);
523
520
        continue;
524
521
      }
525
522
      else
526
523
      {
527
524
        /* login successful */
528
 
        thd->scheduler.logged_in= true;
529
 
        prepare_new_connection_state(thd);
530
 
        if (!libevent_needs_immediate_processing(thd))
 
525
        session->scheduler.logged_in= true;
 
526
        prepare_new_connection_state(session);
 
527
        if (!libevent_needs_immediate_processing(session))
531
528
          continue; /* New connection is now waiting for data in libevent*/
532
529
      }
533
530
    }
535
532
    do
536
533
    {
537
534
      /* Process a query */
538
 
      if (do_command(thd))
 
535
      if (do_command(session))
539
536
      {
540
 
        libevent_connection_close(thd);
 
537
        libevent_connection_close(session);
541
538
        break;
542
539
      }
543
 
    } while (libevent_needs_immediate_processing(thd));
 
540
    } while (libevent_needs_immediate_processing(session));
544
541
  }
545
 
  
 
542
 
546
543
thread_exit:
547
544
  (void) pthread_mutex_lock(&LOCK_thread_count);
548
545
  killed_threads++;
555
552
 
556
553
 
557
554
/*
558
 
  Returns true if the connection needs immediate processing and false if 
 
555
  Returns true if the connection needs immediate processing and false if
559
556
  instead it's queued for libevent processing or closed,
560
557
*/
561
558
 
562
 
static bool libevent_needs_immediate_processing(THD *thd)
 
559
static bool libevent_needs_immediate_processing(Session *session)
563
560
{
564
 
  if (libevent_should_close_connection(thd))
 
561
  if (libevent_should_close_connection(session))
565
562
  {
566
 
    libevent_connection_close(thd);
 
563
    libevent_connection_close(session);
567
564
    return false;
568
565
  }
569
566
  /*
572
569
    Note: we cannot add for event processing because the whole request might
573
570
    already be buffered and we wouldn't receive an event.
574
571
  */
575
 
  if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
 
572
  if (net_more_data(&(session->net)))
576
573
    return true;
577
 
  
578
 
  thd->scheduler.thread_detach();
579
 
  libevent_thd_add(thd);
 
574
 
 
575
  session->scheduler.thread_detach();
 
576
  libevent_session_add(session);
580
577
  return false;
581
578
}
582
579
 
583
580
 
584
581
/*
585
 
  Adds a THD to queued for libevent processing.
586
 
  
 
582
  Adds a Session to queued for libevent processing.
 
583
 
587
584
  This call does not actually register the event with libevent.
588
 
  Instead, it places the THD onto a queue and signals libevent by writing
589
 
  a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
590
 
  be invoked which will find the THD on the queue and add it to libevent.
 
585
  Instead, it places the Session onto a queue and signals libevent by writing
 
586
  a byte into session_add_pipe, which will cause our libevent_add_session_callback to
 
587
  be invoked which will find the Session on the queue and add it to libevent.
591
588
*/
592
589
 
593
 
static void libevent_thd_add(THD* thd)
 
590
static void libevent_session_add(Session* session)
594
591
{
595
592
  char c=0;
596
 
  pthread_mutex_lock(&LOCK_thd_add);
 
593
  pthread_mutex_lock(&LOCK_session_add);
597
594
  /* queue for libevent */
598
 
  thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
 
595
  sessions_need_adding= list_add(sessions_need_adding, &session->scheduler.list);
599
596
  /* notify libevent */
600
 
  write(thd_add_pipe[1], &c, sizeof(c));
601
 
  pthread_mutex_unlock(&LOCK_thd_add);
 
597
  assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
 
598
  pthread_mutex_unlock(&LOCK_session_add);
602
599
}
603
600
 
604
601
 
609
606
static void libevent_end()
610
607
{
611
608
  (void) pthread_mutex_lock(&LOCK_thread_count);
612
 
  
 
609
 
613
610
  kill_pool_threads= true;
614
611
  while (killed_threads != created_threads)
615
612
  {
616
613
    /* wake up the event loop */
617
614
    char c= 0;
618
 
    write(thd_add_pipe[1], &c, sizeof(c));
 
615
    assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
619
616
 
620
617
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
621
618
  }
622
619
  (void) pthread_mutex_unlock(&LOCK_thread_count);
623
 
  
624
 
  event_del(&thd_add_event);
625
 
  close(thd_add_pipe[0]);
626
 
  close(thd_add_pipe[1]);
627
 
  event_del(&thd_kill_event);
628
 
  close(thd_kill_pipe[0]);
629
 
  close(thd_kill_pipe[1]);
 
620
 
 
621
  event_del(&session_add_event);
 
622
  close(session_add_pipe[0]);
 
623
  close(session_add_pipe[1]);
 
624
  event_del(&session_kill_event);
 
625
  close(session_kill_pipe[0]);
 
626
  close(session_kill_pipe[1]);
630
627
 
631
628
  (void) pthread_mutex_destroy(&LOCK_event_loop);
632
 
  (void) pthread_mutex_destroy(&LOCK_thd_add);
 
629
  (void) pthread_mutex_destroy(&LOCK_session_add);
633
630
  return;
634
631
}
635
632