~drizzle-trunk/drizzle/development

1 by brian
clean slate
1
/* Copyright (C) 2007 MySQL AB
2
3
   This program is free software; you can redistribute it and/or modify
4
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
6
7
   This program is distributed in the hope that it will be useful,
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
   GNU General Public License for more details.
11
12
   You should have received a copy of the GNU General Public License
13
   along with this program; if not, write to the Free Software
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
16
/*
17
  Implementation for the thread scheduler
18
*/
19
243.1.17 by Jay Pipes
FINAL PHASE removal of mysql_priv.h (Bye, bye my friend.)
20
#include <drizzled/server_includes.h>
383.1.54 by Monty Taylor
More violite refactoring/removal.
21
#include <libdrizzle/libdrizzle.h>
1 by brian
clean slate
22
#include "event.h"
549 by Monty Taylor
Took gettext.h out of header files.
23
#include <drizzled/gettext.h>
1 by brian
clean slate
24
25
26
/*
27
  'Dummy' functions to be used when we don't need any handling for a scheduler
28
  event
29
 */
30
31
static bool init_dummy(void) {return 0;}
520.1.22 by Brian Aker
Second pass of thd cleanup
32
static void post_kill_dummy(Session *session __attribute__((unused))) {}
1 by brian
clean slate
33
static void end_dummy(void) {}
520.1.22 by Brian Aker
Second pass of thd cleanup
34
static bool end_thread_dummy(Session *session __attribute__((unused)),
212.1.3 by Monty Taylor
Renamed __attribute__((__unused__)) to __attribute__((unused)).
35
                             bool cache_thread __attribute__((unused)))
77.1.45 by Monty Taylor
Warning fixes.
36
{ return 0; }
1 by brian
clean slate
37
38
/*
39
  Initialize default scheduler with dummy functions so that setup functions
40
  only need to declare those that are relvant for their usage
41
*/
42
43
scheduler_functions::scheduler_functions()
44
  :init(init_dummy),
45
   init_new_connection_thread(init_new_connection_handler_thread),
46
   add_connection(0),                           // Must be defined
47
   post_kill_notification(post_kill_dummy),
48
   end_thread(end_thread_dummy), end(end_dummy)
49
{}
50
482 by Brian Aker
Remove uint.
51
static uint32_t created_threads, killed_threads;
1 by brian
clean slate
52
static bool kill_pool_threads;
53
520.1.22 by Brian Aker
Second pass of thd cleanup
54
static struct event session_add_event;
55
static struct event session_kill_event;
56
57
static pthread_mutex_t LOCK_session_add;    /* protects sessions_need_adding */
58
static LIST *sessions_need_adding;    /* list of sessions to add to libevent queue */
59
60
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
61
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
1 by brian
clean slate
62
63
/*
64
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
520.1.22 by Brian Aker
Second pass of thd cleanup
65
  event_del) and sessions_need_processing and sessions_waiting_for_io.
1 by brian
clean slate
66
*/
67
static pthread_mutex_t LOCK_event_loop;
520.1.22 by Brian Aker
Second pass of thd cleanup
68
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
69
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
1 by brian
clean slate
70
71
pthread_handler_t libevent_thread_proc(void *arg);
72
static void libevent_end();
520.1.22 by Brian Aker
Second pass of thd cleanup
73
static bool libevent_needs_immediate_processing(Session *session);
74
static void libevent_connection_close(Session *session);
75
static bool libevent_should_close_connection(Session* session);
76
static void libevent_session_add(Session* session);
1 by brian
clean slate
77
void libevent_io_callback(int Fd, short Operation, void *ctx);
520.1.22 by Brian Aker
Second pass of thd cleanup
78
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
79
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
1 by brian
clean slate
80
81
82
/*
83
  Create a pipe and set to non-blocking.
55 by brian
Update for using real bool types.
84
  Returns true if there is an error.
1 by brian
clean slate
85
*/
86
87
static bool init_pipe(int pipe_fds[])
88
{
89
  int flags;
90
  return pipe(pipe_fds) < 0 ||
91
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
92
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
93
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
94
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
95
}
96
97
98
/*
520.1.22 by Brian Aker
Second pass of thd cleanup
99
  session_scheduler keeps the link between Session and events.
520.1.21 by Brian Aker
THD -> Session rename
100
  It's embedded in the Session class.
1 by brian
clean slate
101
*/
102
520.1.22 by Brian Aker
Second pass of thd cleanup
103
session_scheduler::session_scheduler()
55 by brian
Update for using real bool types.
104
  : logged_in(false), io_event(NULL), thread_attached(false)
1 by brian
clean slate
105
{  
106
}
107
108
520.1.22 by Brian Aker
Second pass of thd cleanup
109
session_scheduler::~session_scheduler()
1 by brian
clean slate
110
{
477 by Monty Taylor
Removed my_free(). It turns out that it had been def'd to ignore the flags passed to it in the second arg anyway. Gotta love that.
111
  free(io_event);
1 by brian
clean slate
112
}
113
114
520.1.22 by Brian Aker
Second pass of thd cleanup
115
session_scheduler::session_scheduler(const session_scheduler&)
509.2.5 by Monty Taylor
Override copy ctr and op= because we have pointer members.
116
  : logged_in(false), io_event(NULL), thread_attached(false)
117
{}
118
520.1.22 by Brian Aker
Second pass of thd cleanup
119
void session_scheduler::operator=(const session_scheduler&)
509.2.5 by Monty Taylor
Override copy ctr and op= because we have pointer members.
120
{}
121
520.1.22 by Brian Aker
Second pass of thd cleanup
122
bool session_scheduler::init(Session *parent_session)
1 by brian
clean slate
123
{
124
  io_event=
125
    (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
126
    
127
  if (!io_event)
128
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
129
    sql_print_error(_("Memory allocation error in session_scheduler::init\n"));
55 by brian
Update for using real bool types.
130
    return true;
1 by brian
clean slate
131
  }
132
  
520.1.22 by Brian Aker
Second pass of thd cleanup
133
  event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ, 
134
            libevent_io_callback, (void*)parent_session);
1 by brian
clean slate
135
    
520.1.22 by Brian Aker
Second pass of thd cleanup
136
  list.data= parent_session;
1 by brian
clean slate
137
  
55 by brian
Update for using real bool types.
138
  return false;
1 by brian
clean slate
139
}
140
141
142
/*
143
  Attach/associate the connection with the OS thread, for command processing.
144
*/
145
520.1.22 by Brian Aker
Second pass of thd cleanup
146
bool session_scheduler::thread_attach()
1 by brian
clean slate
147
{
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
148
  assert(!thread_attached);
520.1.22 by Brian Aker
Second pass of thd cleanup
149
  Session* session = (Session*)list.data;
150
  if (libevent_should_close_connection(session) ||
151
      setup_connection_thread_globals(session))
1 by brian
clean slate
152
  {
55 by brian
Update for using real bool types.
153
    return true;
1 by brian
clean slate
154
  }
155
  my_errno= 0;
520.1.22 by Brian Aker
Second pass of thd cleanup
156
  session->mysys_var->abort= 0;
55 by brian
Update for using real bool types.
157
  thread_attached= true;
519 by Brian Aker
Small cleanup in scheduler
158
55 by brian
Update for using real bool types.
159
  return false;
1 by brian
clean slate
160
}
161
162
163
/*
164
  Detach/disassociate the connection with the OS thread.
165
*/
166
520.1.22 by Brian Aker
Second pass of thd cleanup
167
void session_scheduler::thread_detach()
1 by brian
clean slate
168
{
169
  if (thread_attached)
170
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
171
    Session* session = (Session*)list.data;
172
    session->mysys_var= NULL;
55 by brian
Update for using real bool types.
173
    thread_attached= false;
1 by brian
clean slate
174
  }
175
}
176
177
/**
178
  Create all threads for the thread pool
179
180
  NOTES
181
    After threads are created we wait until all threads has signaled that
182
    they have started before we return
183
184
  RETURN
185
    0  ok
186
    1  We got an error creating the thread pool
187
       In this case we will abort all created threads
188
*/
189
190
static bool libevent_init(void)
191
{
482 by Brian Aker
Remove uint.
192
  uint32_t i;
1 by brian
clean slate
193
194
  event_init();
195
  
196
  created_threads= 0;
197
  killed_threads= 0;
55 by brian
Update for using real bool types.
198
  kill_pool_threads= false;
1 by brian
clean slate
199
  
200
  pthread_mutex_init(&LOCK_event_loop, NULL);
520.1.22 by Brian Aker
Second pass of thd cleanup
201
  pthread_mutex_init(&LOCK_session_add, NULL);
1 by brian
clean slate
202
  
520.1.22 by Brian Aker
Second pass of thd cleanup
203
  /* set up the pipe used to add new sessions to the event pool */
204
  if (init_pipe(session_add_pipe))
205
  {
206
    sql_print_error(_("init_pipe(session_add_pipe) error in libevent_init\n"));
207
    return(1);
208
  }
209
  /* set up the pipe used to kill sessions in the event queue */
210
  if (init_pipe(session_kill_pipe))
211
  {
212
    sql_print_error(_("init_pipe(session_kill_pipe) error in libevent_init\n"));
213
    close(session_add_pipe[0]);
214
    close(session_add_pipe[1]);
215
    return(1);
216
  }
217
  event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
218
            libevent_add_session_callback, NULL);
219
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
220
            libevent_kill_session_callback, NULL);
1 by brian
clean slate
221
 
520.1.22 by Brian Aker
Second pass of thd cleanup
222
 if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
1 by brian
clean slate
223
 {
520.1.22 by Brian Aker
Second pass of thd cleanup
224
   sql_print_error(_("session_add_event event_add error in libevent_init\n"));
1 by brian
clean slate
225
   libevent_end();
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
226
   return(1);
1 by brian
clean slate
227
   
228
 }
229
  /* Set up the thread pool */
230
  created_threads= killed_threads= 0;
231
  pthread_mutex_lock(&LOCK_thread_count);
232
233
  for (i= 0; i < thread_pool_size; i++)
234
  {
235
    pthread_t thread;
236
    int error;
237
    if ((error= pthread_create(&thread, &connection_attrib,
238
                               libevent_thread_proc, 0)))
239
    {
338 by Monty Taylor
Tagged more strings.
240
      sql_print_error(_("Can't create completion port thread (error %d)"),
1 by brian
clean slate
241
                      error);
242
      pthread_mutex_unlock(&LOCK_thread_count);
243
      libevent_end();                      // Cleanup
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
244
      return(true);
1 by brian
clean slate
245
    }
246
  }
247
248
  /* Wait until all threads are created */
249
  while (created_threads != thread_pool_size)
250
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
251
  pthread_mutex_unlock(&LOCK_thread_count);
252
  
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
253
  return(false);
1 by brian
clean slate
254
}
255
256
257
/*
258
  This is called when data is ready on the socket.
259
  
260
  NOTES
261
    This is only called by the thread that owns LOCK_event_loop.
262
  
520.1.22 by Brian Aker
Second pass of thd cleanup
263
    We add the session that got the data to sessions_need_processing, and 
1 by brian
clean slate
264
    cause the libevent event_loop() to terminate. Then this same thread will
520.1.22 by Brian Aker
Second pass of thd cleanup
265
    return from event_loop and pick the session value back up for processing.
1 by brian
clean slate
266
*/
267
268
void libevent_io_callback(int, short, void *ctx)
269
{    
270
  safe_mutex_assert_owner(&LOCK_event_loop);
520.1.22 by Brian Aker
Second pass of thd cleanup
271
  Session *session= (Session*)ctx;
272
  sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
273
  sessions_need_processing= list_add(sessions_need_processing, &session->scheduler.list);
1 by brian
clean slate
274
}
275
276
/*
277
  This is called when we have a thread we want to be killed.
278
  
279
  NOTES
280
    This is only called by the thread that owns LOCK_event_loop.
281
*/
282
520.1.22 by Brian Aker
Second pass of thd cleanup
283
void libevent_kill_session_callback(int Fd, short, void*)
1 by brian
clean slate
284
{    
285
  safe_mutex_assert_owner(&LOCK_event_loop);
286
287
  /* clear the pending events */
288
  char c;
289
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
290
  {}
291
520.1.22 by Brian Aker
Second pass of thd cleanup
292
  LIST* list= sessions_waiting_for_io;
1 by brian
clean slate
293
  while (list)
294
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
295
    Session *session= (Session*)list->data;
1 by brian
clean slate
296
    list= list_rest(list);
520.1.22 by Brian Aker
Second pass of thd cleanup
297
    if (session->killed == Session::KILL_CONNECTION)
1 by brian
clean slate
298
    {
299
      /*
300
        Delete from libevent and add to the processing queue.
301
      */
520.1.22 by Brian Aker
Second pass of thd cleanup
302
      event_del(session->scheduler.io_event);
303
      sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
304
                                       &session->scheduler.list);
305
      sessions_need_processing= list_add(sessions_need_processing,
306
                                     &session->scheduler.list);
1 by brian
clean slate
307
    }
308
  }
309
}
310
311
312
/*
313
  This is used to add connections to the pool. This callback is invoked from
520.1.22 by Brian Aker
Second pass of thd cleanup
314
  the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
1 by brian
clean slate
315
  written to it.
316
  
317
  NOTES
318
    This is only called by the thread that owns LOCK_event_loop.
319
*/
320
520.1.22 by Brian Aker
Second pass of thd cleanup
321
void libevent_add_session_callback(int Fd, short, void *)
1 by brian
clean slate
322
{ 
323
  safe_mutex_assert_owner(&LOCK_event_loop);
324
325
  /* clear the pending events */
326
  char c;
327
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
328
  {}
329
520.1.22 by Brian Aker
Second pass of thd cleanup
330
  pthread_mutex_lock(&LOCK_session_add);
331
  while (sessions_need_adding)
1 by brian
clean slate
332
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
333
    /* pop the first session off the list */
334
    Session* session= (Session*)sessions_need_adding->data;
335
    sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
1 by brian
clean slate
336
520.1.22 by Brian Aker
Second pass of thd cleanup
337
    pthread_mutex_unlock(&LOCK_session_add);
1 by brian
clean slate
338
    
520.1.22 by Brian Aker
Second pass of thd cleanup
339
    if (!session->scheduler.logged_in || libevent_should_close_connection(session))
1 by brian
clean slate
340
    {
341
      /*
520.1.22 by Brian Aker
Second pass of thd cleanup
342
        Add session to sessions_need_processing list. If it needs closing we'll close
1 by brian
clean slate
343
        it outside of event_loop().
344
      */
520.1.22 by Brian Aker
Second pass of thd cleanup
345
      sessions_need_processing= list_add(sessions_need_processing,
346
                                     &session->scheduler.list);
1 by brian
clean slate
347
    }
348
    else
349
    {
350
      /* Add to libevent */
520.1.22 by Brian Aker
Second pass of thd cleanup
351
      if (event_add(session->scheduler.io_event, NULL))
1 by brian
clean slate
352
      {
520.1.22 by Brian Aker
Second pass of thd cleanup
353
        sql_print_error(_("event_add error in libevent_add_session_callback\n"));
354
        libevent_connection_close(session);
1 by brian
clean slate
355
      } 
356
      else
357
      {
520.1.22 by Brian Aker
Second pass of thd cleanup
358
        sessions_waiting_for_io= list_add(sessions_waiting_for_io,
359
                                      &session->scheduler.list);
1 by brian
clean slate
360
      }
361
    }
520.1.22 by Brian Aker
Second pass of thd cleanup
362
    pthread_mutex_lock(&LOCK_session_add);
1 by brian
clean slate
363
  }
520.1.22 by Brian Aker
Second pass of thd cleanup
364
  pthread_mutex_unlock(&LOCK_session_add);
1 by brian
clean slate
365
}
366
367
368
/**
369
  Notify the thread pool about a new connection
370
371
  NOTES
372
    LOCK_thread_count is locked on entry. This function MUST unlock it!
373
*/
374
520.1.22 by Brian Aker
Second pass of thd cleanup
375
static void libevent_add_connection(Session *session)
1 by brian
clean slate
376
{
520.1.22 by Brian Aker
Second pass of thd cleanup
377
  if (session->scheduler.init(session))
1 by brian
clean slate
378
  {
338 by Monty Taylor
Tagged more strings.
379
    sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
1 by brian
clean slate
380
    pthread_mutex_unlock(&LOCK_thread_count);
520.1.22 by Brian Aker
Second pass of thd cleanup
381
    libevent_connection_close(session);
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
382
    return;
1 by brian
clean slate
383
  }
520.1.22 by Brian Aker
Second pass of thd cleanup
384
  threads.append(session);
385
  libevent_session_add(session);
1 by brian
clean slate
386
  
387
  pthread_mutex_unlock(&LOCK_thread_count);
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
388
  return;
1 by brian
clean slate
389
}
390
391
392
/**
393
  @brief Signal a waiting connection it's time to die.
394
 
520.1.21 by Brian Aker
THD -> Session rename
395
  @details This function will signal libevent the Session should be killed.
520.1.22 by Brian Aker
Second pass of thd cleanup
396
    Either the global LOCK_session_count or the Session's LOCK_delete must be locked
1 by brian
clean slate
397
    upon entry.
398
 
520.1.22 by Brian Aker
Second pass of thd cleanup
399
  @param[in]  session The connection to kill
1 by brian
clean slate
400
*/
401
520.1.21 by Brian Aker
THD -> Session rename
402
static void libevent_post_kill_notification(Session *)
1 by brian
clean slate
403
{
404
  /*
520.1.21 by Brian Aker
THD -> Session rename
405
    Note, we just wake up libevent with an event that a Session should be killed,
520.1.22 by Brian Aker
Second pass of thd cleanup
406
    It will search its list of sessions for session->killed ==  KILL_CONNECTION to
520.1.21 by Brian Aker
THD -> Session rename
407
    find the Sessions it should kill.
1 by brian
clean slate
408
    
409
    So we don't actually tell it which one and we don't actually use the
520.1.21 by Brian Aker
THD -> Session rename
410
    Session being passed to us, but that's just a design detail that could change
1 by brian
clean slate
411
    later.
412
  */
413
  char c= 0;
520.1.22 by Brian Aker
Second pass of thd cleanup
414
  assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
1 by brian
clean slate
415
}
416
417
418
/*
419
  Close and delete a connection.
420
*/
421
520.1.22 by Brian Aker
Second pass of thd cleanup
422
static void libevent_connection_close(Session *session)
1 by brian
clean slate
423
{
520.1.22 by Brian Aker
Second pass of thd cleanup
424
  session->killed= Session::KILL_CONNECTION;          // Avoid error messages
1 by brian
clean slate
425
520.1.22 by Brian Aker
Second pass of thd cleanup
426
  if (net_get_sd(&(session->net)) >= 0)                  // not already closed
1 by brian
clean slate
427
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
428
    end_connection(session);
429
    close_connection(session, 0, 1);
1 by brian
clean slate
430
  }
520.1.22 by Brian Aker
Second pass of thd cleanup
431
  session->scheduler.thread_detach();
432
  unlink_session(session);   /* locks LOCK_thread_count and deletes session */
1 by brian
clean slate
433
  pthread_mutex_unlock(&LOCK_thread_count);
434
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
435
  return;
1 by brian
clean slate
436
}
437
438
439
/*
520.1.21 by Brian Aker
THD -> Session rename
440
  Returns true if we should close and delete a Session connection.
1 by brian
clean slate
441
*/
442
520.1.22 by Brian Aker
Second pass of thd cleanup
443
static bool libevent_should_close_connection(Session* session)
1 by brian
clean slate
444
{
520.1.22 by Brian Aker
Second pass of thd cleanup
445
  return net_should_close(&(session->net)) ||
446
         session->killed == Session::KILL_CONNECTION;
1 by brian
clean slate
447
}
448
449
450
/*
451
  libevent_thread_proc is the outer loop of each thread in the thread pool.
452
  These procs only return/terminate on shutdown (kill_pool_threads == true).
453
*/
454
212.1.3 by Monty Taylor
Renamed __attribute__((__unused__)) to __attribute__((unused)).
455
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
1 by brian
clean slate
456
{
457
  if (init_new_connection_handler_thread())
458
  {
459
    my_thread_global_end();
338 by Monty Taylor
Tagged more strings.
460
    sql_print_error(_("libevent_thread_proc: my_thread_init() failed\n"));
1 by brian
clean slate
461
    exit(1);
462
  }
463
464
  /*
465
    Signal libevent_init() when all threads has been created and are ready to
466
    receive events.
467
  */
468
  (void) pthread_mutex_lock(&LOCK_thread_count);
469
  created_threads++;
470
  if (created_threads == thread_pool_size)
471
    (void) pthread_cond_signal(&COND_thread_count);
472
  (void) pthread_mutex_unlock(&LOCK_thread_count);
473
  
474
  for (;;)
475
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
476
    Session *session= NULL;
1 by brian
clean slate
477
    (void) pthread_mutex_lock(&LOCK_event_loop);
478
    
520.1.22 by Brian Aker
Second pass of thd cleanup
479
    /* get session(s) to process */
480
    while (!sessions_need_processing)
1 by brian
clean slate
481
    {
482
      if (kill_pool_threads)
483
      {
484
        /* the flag that we should die has been set */
485
        (void) pthread_mutex_unlock(&LOCK_event_loop);
486
        goto thread_exit;
487
      }
488
      event_loop(EVLOOP_ONCE);
489
    }
490
    
520.1.22 by Brian Aker
Second pass of thd cleanup
491
    /* pop the first session off the list */
492
    session= (Session*)sessions_need_processing->data;
493
    sessions_need_processing= list_delete(sessions_need_processing,
494
                                      sessions_need_processing);
1 by brian
clean slate
495
    
496
    (void) pthread_mutex_unlock(&LOCK_event_loop);
497
    
520.1.22 by Brian Aker
Second pass of thd cleanup
498
    /* now we process the connection (session) */
499
    
500
    /* set up the session<->thread links. */
501
    session->thread_stack= (char*) &session;
502
    
503
    if (session->scheduler.thread_attach())
1 by brian
clean slate
504
    {
520.1.22 by Brian Aker
Second pass of thd cleanup
505
      libevent_connection_close(session);
1 by brian
clean slate
506
      continue;
507
    }
508
509
    /* is the connection logged in yet? */
520.1.22 by Brian Aker
Second pass of thd cleanup
510
    if (!session->scheduler.logged_in)
1 by brian
clean slate
511
    {
520.1.22 by Brian Aker
Second pass of thd cleanup
512
      if (login_connection(session))
1 by brian
clean slate
513
      {
514
        /* Failed to log in */
520.1.22 by Brian Aker
Second pass of thd cleanup
515
        libevent_connection_close(session);
1 by brian
clean slate
516
        continue;
517
      }
518
      else
519
      {
520
        /* login successful */
520.1.22 by Brian Aker
Second pass of thd cleanup
521
        session->scheduler.logged_in= true;
522
        prepare_new_connection_state(session);
523
        if (!libevent_needs_immediate_processing(session))
1 by brian
clean slate
524
          continue; /* New connection is now waiting for data in libevent*/
525
      }
526
    }
527
528
    do
529
    {
530
      /* Process a query */
520.1.22 by Brian Aker
Second pass of thd cleanup
531
      if (do_command(session))
1 by brian
clean slate
532
      {
520.1.22 by Brian Aker
Second pass of thd cleanup
533
        libevent_connection_close(session);
1 by brian
clean slate
534
        break;
535
      }
520.1.22 by Brian Aker
Second pass of thd cleanup
536
    } while (libevent_needs_immediate_processing(session));
1 by brian
clean slate
537
  }
538
  
539
thread_exit:
540
  (void) pthread_mutex_lock(&LOCK_thread_count);
541
  killed_threads++;
542
  pthread_cond_broadcast(&COND_thread_count);
543
  (void) pthread_mutex_unlock(&LOCK_thread_count);
544
  my_thread_end();
545
  pthread_exit(0);
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
546
  return(0);                               /* purify: deadcode */
1 by brian
clean slate
547
}
548
549
550
/*
55 by brian
Update for using real bool types.
551
  Returns true if the connection needs immediate processing and false if 
1 by brian
clean slate
552
  instead it's queued for libevent processing or closed,
553
*/
554
520.1.22 by Brian Aker
Second pass of thd cleanup
555
static bool libevent_needs_immediate_processing(Session *session)
1 by brian
clean slate
556
{
520.1.22 by Brian Aker
Second pass of thd cleanup
557
  if (libevent_should_close_connection(session))
1 by brian
clean slate
558
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
559
    libevent_connection_close(session);
55 by brian
Update for using real bool types.
560
    return false;
1 by brian
clean slate
561
  }
562
  /*
55 by brian
Update for using real bool types.
563
    If more data in the socket buffer, return true to process another command.
1 by brian
clean slate
564
565
    Note: we cannot add for event processing because the whole request might
566
    already be buffered and we wouldn't receive an event.
567
  */
520.1.22 by Brian Aker
Second pass of thd cleanup
568
  if (net_more_data(&(session->net)))
55 by brian
Update for using real bool types.
569
    return true;
1 by brian
clean slate
570
  
520.1.22 by Brian Aker
Second pass of thd cleanup
571
  session->scheduler.thread_detach();
572
  libevent_session_add(session);
55 by brian
Update for using real bool types.
573
  return false;
1 by brian
clean slate
574
}
575
576
577
/*
520.1.21 by Brian Aker
THD -> Session rename
578
  Adds a Session to queued for libevent processing.
1 by brian
clean slate
579
  
580
  This call does not actually register the event with libevent.
520.1.21 by Brian Aker
THD -> Session rename
581
  Instead, it places the Session onto a queue and signals libevent by writing
520.1.22 by Brian Aker
Second pass of thd cleanup
582
  a byte into session_add_pipe, which will cause our libevent_add_session_callback to
520.1.21 by Brian Aker
THD -> Session rename
583
  be invoked which will find the Session on the queue and add it to libevent.
1 by brian
clean slate
584
*/
585
520.1.22 by Brian Aker
Second pass of thd cleanup
586
static void libevent_session_add(Session* session)
1 by brian
clean slate
587
{
588
  char c=0;
520.1.22 by Brian Aker
Second pass of thd cleanup
589
  pthread_mutex_lock(&LOCK_session_add);
1 by brian
clean slate
590
  /* queue for libevent */
520.1.22 by Brian Aker
Second pass of thd cleanup
591
  sessions_need_adding= list_add(sessions_need_adding, &session->scheduler.list);
1 by brian
clean slate
592
  /* notify libevent */
520.1.22 by Brian Aker
Second pass of thd cleanup
593
  assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
594
  pthread_mutex_unlock(&LOCK_session_add);
1 by brian
clean slate
595
}
596
597
598
/**
599
  Wait until all pool threads have been deleted for clean shutdown
600
*/
601
602
static void libevent_end()
603
{
604
  (void) pthread_mutex_lock(&LOCK_thread_count);
605
  
55 by brian
Update for using real bool types.
606
  kill_pool_threads= true;
1 by brian
clean slate
607
  while (killed_threads != created_threads)
608
  {
609
    /* wake up the event loop */
610
    char c= 0;
520.1.22 by Brian Aker
Second pass of thd cleanup
611
    assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
1 by brian
clean slate
612
613
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
614
  }
615
  (void) pthread_mutex_unlock(&LOCK_thread_count);
616
  
520.1.22 by Brian Aker
Second pass of thd cleanup
617
  event_del(&session_add_event);
618
  close(session_add_pipe[0]);
619
  close(session_add_pipe[1]);
620
  event_del(&session_kill_event);
621
  close(session_kill_pipe[0]);
622
  close(session_kill_pipe[1]);
1 by brian
clean slate
623
624
  (void) pthread_mutex_destroy(&LOCK_event_loop);
520.1.22 by Brian Aker
Second pass of thd cleanup
625
  (void) pthread_mutex_destroy(&LOCK_session_add);
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
626
  return;
1 by brian
clean slate
627
}
628
629
630
void pool_of_threads_scheduler(scheduler_functions* func)
631
{
632
  func->max_threads= thread_pool_size;
633
  func->init= libevent_init;
634
  func->end=  libevent_end;
635
  func->post_kill_notification= libevent_post_kill_notification;
636
  func->add_connection= libevent_add_connection;
637
}