~drizzle-trunk/drizzle/development

1 by brian
clean slate
1
/* Copyright (C) 2007 MySQL AB
2
3
   This program is free software; you can redistribute it and/or modify
4
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
6
7
   This program is distributed in the hope that it will be useful,
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
   GNU General Public License for more details.
11
12
   You should have received a copy of the GNU General Public License
13
   along with this program; if not, write to the Free Software
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
16
/*
17
  Implementation for the thread scheduler
18
*/
19
243.1.17 by Jay Pipes
FINAL PHASE removal of mysql_priv.h (Bye, bye my friend.)
20
#include <drizzled/server_includes.h>
383.1.54 by Monty Taylor
More violite refactoring/removal.
21
#include <libdrizzle/libdrizzle.h>
575.4.7 by Monty Taylor
More header cleanup.
22
#include <event.h>
549 by Monty Taylor
Took gettext.h out of header files.
23
#include <drizzled/gettext.h>
575.4.7 by Monty Taylor
More header cleanup.
24
#include <drizzled/sql_parse.h>
1 by brian
clean slate
25
26
27
/*
28
  'Dummy' functions to be used when we don't need any handling for a scheduler
29
  event
30
 */
31
32
static bool init_dummy(void) {return 0;}
520.1.22 by Brian Aker
Second pass of thd cleanup
33
static void post_kill_dummy(Session *session __attribute__((unused))) {}
1 by brian
clean slate
34
static void end_dummy(void) {}
520.1.22 by Brian Aker
Second pass of thd cleanup
35
static bool end_thread_dummy(Session *session __attribute__((unused)),
212.1.3 by Monty Taylor
Renamed __attribute__((__unused__)) to __attribute__((unused)).
36
                             bool cache_thread __attribute__((unused)))
77.1.45 by Monty Taylor
Warning fixes.
37
{ return 0; }
1 by brian
clean slate
38
39
/*
40
  Initialize default scheduler with dummy functions so that setup functions
41
  only need to declare those that are relvant for their usage
42
*/
43
44
scheduler_functions::scheduler_functions()
45
  :init(init_dummy),
46
   init_new_connection_thread(init_new_connection_handler_thread),
47
   add_connection(0),                           // Must be defined
48
   post_kill_notification(post_kill_dummy),
49
   end_thread(end_thread_dummy), end(end_dummy)
50
{}
51
482 by Brian Aker
Remove uint.
52
static uint32_t created_threads, killed_threads;
1 by brian
clean slate
53
static bool kill_pool_threads;
54
520.1.22 by Brian Aker
Second pass of thd cleanup
55
static struct event session_add_event;
56
static struct event session_kill_event;
57
58
static pthread_mutex_t LOCK_session_add;    /* protects sessions_need_adding */
59
static LIST *sessions_need_adding;    /* list of sessions to add to libevent queue */
60
61
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
62
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
1 by brian
clean slate
63
64
/*
65
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
520.1.22 by Brian Aker
Second pass of thd cleanup
66
  event_del) and sessions_need_processing and sessions_waiting_for_io.
1 by brian
clean slate
67
*/
68
static pthread_mutex_t LOCK_event_loop;
520.1.22 by Brian Aker
Second pass of thd cleanup
69
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
70
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
1 by brian
clean slate
71
72
pthread_handler_t libevent_thread_proc(void *arg);
73
static void libevent_end();
520.1.22 by Brian Aker
Second pass of thd cleanup
74
static bool libevent_needs_immediate_processing(Session *session);
75
static void libevent_connection_close(Session *session);
76
static bool libevent_should_close_connection(Session* session);
77
static void libevent_session_add(Session* session);
1 by brian
clean slate
78
void libevent_io_callback(int Fd, short Operation, void *ctx);
520.1.22 by Brian Aker
Second pass of thd cleanup
79
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
80
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
1 by brian
clean slate
81
82
83
/*
84
  Create a pipe and set to non-blocking.
55 by brian
Update for using real bool types.
85
  Returns true if there is an error.
1 by brian
clean slate
86
*/
87
88
static bool init_pipe(int pipe_fds[])
89
{
90
  int flags;
91
  return pipe(pipe_fds) < 0 ||
92
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
93
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
94
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
95
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
96
}
97
98
99
/*
520.1.22 by Brian Aker
Second pass of thd cleanup
100
  session_scheduler keeps the link between Session and events.
520.1.21 by Brian Aker
THD -> Session rename
101
  It's embedded in the Session class.
1 by brian
clean slate
102
*/
103
520.1.22 by Brian Aker
Second pass of thd cleanup
104
session_scheduler::session_scheduler()
55 by brian
Update for using real bool types.
105
  : logged_in(false), io_event(NULL), thread_attached(false)
1 by brian
clean slate
106
{  
107
}
108
109
520.1.22 by Brian Aker
Second pass of thd cleanup
110
session_scheduler::~session_scheduler()
1 by brian
clean slate
111
{
477 by Monty Taylor
Removed my_free(). It turns out that it had been def'd to ignore the flags passed to it in the second arg anyway. Gotta love that.
112
  free(io_event);
1 by brian
clean slate
113
}
114
115
520.1.22 by Brian Aker
Second pass of thd cleanup
116
session_scheduler::session_scheduler(const session_scheduler&)
509.2.5 by Monty Taylor
Override copy ctr and op= because we have pointer members.
117
  : logged_in(false), io_event(NULL), thread_attached(false)
118
{}
119
520.1.22 by Brian Aker
Second pass of thd cleanup
120
void session_scheduler::operator=(const session_scheduler&)
509.2.5 by Monty Taylor
Override copy ctr and op= because we have pointer members.
121
{}
122
520.1.22 by Brian Aker
Second pass of thd cleanup
123
bool session_scheduler::init(Session *parent_session)
1 by brian
clean slate
124
{
125
  io_event=
126
    (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
127
    
128
  if (!io_event)
129
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
130
    sql_print_error(_("Memory allocation error in session_scheduler::init\n"));
55 by brian
Update for using real bool types.
131
    return true;
1 by brian
clean slate
132
  }
133
  
520.1.22 by Brian Aker
Second pass of thd cleanup
134
  event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ, 
135
            libevent_io_callback, (void*)parent_session);
1 by brian
clean slate
136
    
520.1.22 by Brian Aker
Second pass of thd cleanup
137
  list.data= parent_session;
1 by brian
clean slate
138
  
55 by brian
Update for using real bool types.
139
  return false;
1 by brian
clean slate
140
}
141
142
143
/*
144
  Attach/associate the connection with the OS thread, for command processing.
145
*/
146
520.1.22 by Brian Aker
Second pass of thd cleanup
147
bool session_scheduler::thread_attach()
1 by brian
clean slate
148
{
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
149
  assert(!thread_attached);
520.1.22 by Brian Aker
Second pass of thd cleanup
150
  Session* session = (Session*)list.data;
151
  if (libevent_should_close_connection(session) ||
152
      setup_connection_thread_globals(session))
1 by brian
clean slate
153
  {
55 by brian
Update for using real bool types.
154
    return true;
1 by brian
clean slate
155
  }
156
  my_errno= 0;
520.1.22 by Brian Aker
Second pass of thd cleanup
157
  session->mysys_var->abort= 0;
55 by brian
Update for using real bool types.
158
  thread_attached= true;
519 by Brian Aker
Small cleanup in scheduler
159
55 by brian
Update for using real bool types.
160
  return false;
1 by brian
clean slate
161
}
162
163
164
/*
165
  Detach/disassociate the connection with the OS thread.
166
*/
167
520.1.22 by Brian Aker
Second pass of thd cleanup
168
void session_scheduler::thread_detach()
1 by brian
clean slate
169
{
170
  if (thread_attached)
171
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
172
    Session* session = (Session*)list.data;
173
    session->mysys_var= NULL;
55 by brian
Update for using real bool types.
174
    thread_attached= false;
1 by brian
clean slate
175
  }
176
}
177
178
/**
179
  Create all threads for the thread pool
180
181
  NOTES
182
    After threads are created we wait until all threads has signaled that
183
    they have started before we return
184
185
  RETURN
186
    0  ok
187
    1  We got an error creating the thread pool
188
       In this case we will abort all created threads
189
*/
190
191
static bool libevent_init(void)
192
{
482 by Brian Aker
Remove uint.
193
  uint32_t i;
1 by brian
clean slate
194
195
  event_init();
196
  
197
  created_threads= 0;
198
  killed_threads= 0;
55 by brian
Update for using real bool types.
199
  kill_pool_threads= false;
1 by brian
clean slate
200
  
201
  pthread_mutex_init(&LOCK_event_loop, NULL);
520.1.22 by Brian Aker
Second pass of thd cleanup
202
  pthread_mutex_init(&LOCK_session_add, NULL);
1 by brian
clean slate
203
  
520.1.22 by Brian Aker
Second pass of thd cleanup
204
  /* set up the pipe used to add new sessions to the event pool */
205
  if (init_pipe(session_add_pipe))
206
  {
207
    sql_print_error(_("init_pipe(session_add_pipe) error in libevent_init\n"));
208
    return(1);
209
  }
210
  /* set up the pipe used to kill sessions in the event queue */
211
  if (init_pipe(session_kill_pipe))
212
  {
213
    sql_print_error(_("init_pipe(session_kill_pipe) error in libevent_init\n"));
214
    close(session_add_pipe[0]);
215
    close(session_add_pipe[1]);
216
    return(1);
217
  }
218
  event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
219
            libevent_add_session_callback, NULL);
220
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
221
            libevent_kill_session_callback, NULL);
1 by brian
clean slate
222
 
520.1.22 by Brian Aker
Second pass of thd cleanup
223
 if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
1 by brian
clean slate
224
 {
520.1.22 by Brian Aker
Second pass of thd cleanup
225
   sql_print_error(_("session_add_event event_add error in libevent_init\n"));
1 by brian
clean slate
226
   libevent_end();
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
227
   return(1);
1 by brian
clean slate
228
   
229
 }
230
  /* Set up the thread pool */
231
  created_threads= killed_threads= 0;
232
  pthread_mutex_lock(&LOCK_thread_count);
233
234
  for (i= 0; i < thread_pool_size; i++)
235
  {
236
    pthread_t thread;
237
    int error;
238
    if ((error= pthread_create(&thread, &connection_attrib,
239
                               libevent_thread_proc, 0)))
240
    {
338 by Monty Taylor
Tagged more strings.
241
      sql_print_error(_("Can't create completion port thread (error %d)"),
1 by brian
clean slate
242
                      error);
243
      pthread_mutex_unlock(&LOCK_thread_count);
244
      libevent_end();                      // Cleanup
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
245
      return(true);
1 by brian
clean slate
246
    }
247
  }
248
249
  /* Wait until all threads are created */
250
  while (created_threads != thread_pool_size)
251
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
252
  pthread_mutex_unlock(&LOCK_thread_count);
253
  
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
254
  return(false);
1 by brian
clean slate
255
}
256
257
258
/*
259
  This is called when data is ready on the socket.
260
  
261
  NOTES
262
    This is only called by the thread that owns LOCK_event_loop.
263
  
520.1.22 by Brian Aker
Second pass of thd cleanup
264
    We add the session that got the data to sessions_need_processing, and 
1 by brian
clean slate
265
    cause the libevent event_loop() to terminate. Then this same thread will
520.1.22 by Brian Aker
Second pass of thd cleanup
266
    return from event_loop and pick the session value back up for processing.
1 by brian
clean slate
267
*/
268
269
void libevent_io_callback(int, short, void *ctx)
270
{    
271
  safe_mutex_assert_owner(&LOCK_event_loop);
520.1.22 by Brian Aker
Second pass of thd cleanup
272
  Session *session= (Session*)ctx;
273
  sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
274
  sessions_need_processing= list_add(sessions_need_processing, &session->scheduler.list);
1 by brian
clean slate
275
}
276
277
/*
278
  This is called when we have a thread we want to be killed.
279
  
280
  NOTES
281
    This is only called by the thread that owns LOCK_event_loop.
282
*/
283
520.1.22 by Brian Aker
Second pass of thd cleanup
284
void libevent_kill_session_callback(int Fd, short, void*)
1 by brian
clean slate
285
{    
286
  safe_mutex_assert_owner(&LOCK_event_loop);
287
288
  /* clear the pending events */
289
  char c;
290
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
291
  {}
292
520.1.22 by Brian Aker
Second pass of thd cleanup
293
  LIST* list= sessions_waiting_for_io;
1 by brian
clean slate
294
  while (list)
295
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
296
    Session *session= (Session*)list->data;
1 by brian
clean slate
297
    list= list_rest(list);
520.1.22 by Brian Aker
Second pass of thd cleanup
298
    if (session->killed == Session::KILL_CONNECTION)
1 by brian
clean slate
299
    {
300
      /*
301
        Delete from libevent and add to the processing queue.
302
      */
520.1.22 by Brian Aker
Second pass of thd cleanup
303
      event_del(session->scheduler.io_event);
304
      sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
305
                                       &session->scheduler.list);
306
      sessions_need_processing= list_add(sessions_need_processing,
307
                                     &session->scheduler.list);
1 by brian
clean slate
308
    }
309
  }
310
}
311
312
313
/*
314
  This is used to add connections to the pool. This callback is invoked from
520.1.22 by Brian Aker
Second pass of thd cleanup
315
  the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
1 by brian
clean slate
316
  written to it.
317
  
318
  NOTES
319
    This is only called by the thread that owns LOCK_event_loop.
320
*/
321
520.1.22 by Brian Aker
Second pass of thd cleanup
322
void libevent_add_session_callback(int Fd, short, void *)
1 by brian
clean slate
323
{ 
324
  safe_mutex_assert_owner(&LOCK_event_loop);
325
326
  /* clear the pending events */
327
  char c;
328
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
329
  {}
330
520.1.22 by Brian Aker
Second pass of thd cleanup
331
  pthread_mutex_lock(&LOCK_session_add);
332
  while (sessions_need_adding)
1 by brian
clean slate
333
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
334
    /* pop the first session off the list */
335
    Session* session= (Session*)sessions_need_adding->data;
336
    sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
1 by brian
clean slate
337
520.1.22 by Brian Aker
Second pass of thd cleanup
338
    pthread_mutex_unlock(&LOCK_session_add);
1 by brian
clean slate
339
    
520.1.22 by Brian Aker
Second pass of thd cleanup
340
    if (!session->scheduler.logged_in || libevent_should_close_connection(session))
1 by brian
clean slate
341
    {
342
      /*
520.1.22 by Brian Aker
Second pass of thd cleanup
343
        Add session to sessions_need_processing list. If it needs closing we'll close
1 by brian
clean slate
344
        it outside of event_loop().
345
      */
520.1.22 by Brian Aker
Second pass of thd cleanup
346
      sessions_need_processing= list_add(sessions_need_processing,
347
                                     &session->scheduler.list);
1 by brian
clean slate
348
    }
349
    else
350
    {
351
      /* Add to libevent */
520.1.22 by Brian Aker
Second pass of thd cleanup
352
      if (event_add(session->scheduler.io_event, NULL))
1 by brian
clean slate
353
      {
520.1.22 by Brian Aker
Second pass of thd cleanup
354
        sql_print_error(_("event_add error in libevent_add_session_callback\n"));
355
        libevent_connection_close(session);
1 by brian
clean slate
356
      } 
357
      else
358
      {
520.1.22 by Brian Aker
Second pass of thd cleanup
359
        sessions_waiting_for_io= list_add(sessions_waiting_for_io,
360
                                      &session->scheduler.list);
1 by brian
clean slate
361
      }
362
    }
520.1.22 by Brian Aker
Second pass of thd cleanup
363
    pthread_mutex_lock(&LOCK_session_add);
1 by brian
clean slate
364
  }
520.1.22 by Brian Aker
Second pass of thd cleanup
365
  pthread_mutex_unlock(&LOCK_session_add);
1 by brian
clean slate
366
}
367
368
369
/**
370
  Notify the thread pool about a new connection
371
372
  NOTES
373
    LOCK_thread_count is locked on entry. This function MUST unlock it!
374
*/
375
520.1.22 by Brian Aker
Second pass of thd cleanup
376
static void libevent_add_connection(Session *session)
1 by brian
clean slate
377
{
520.1.22 by Brian Aker
Second pass of thd cleanup
378
  if (session->scheduler.init(session))
1 by brian
clean slate
379
  {
338 by Monty Taylor
Tagged more strings.
380
    sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
1 by brian
clean slate
381
    pthread_mutex_unlock(&LOCK_thread_count);
520.1.22 by Brian Aker
Second pass of thd cleanup
382
    libevent_connection_close(session);
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
383
    return;
1 by brian
clean slate
384
  }
520.1.22 by Brian Aker
Second pass of thd cleanup
385
  threads.append(session);
386
  libevent_session_add(session);
1 by brian
clean slate
387
  
388
  pthread_mutex_unlock(&LOCK_thread_count);
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
389
  return;
1 by brian
clean slate
390
}
391
392
393
/**
394
  @brief Signal a waiting connection it's time to die.
395
 
520.1.21 by Brian Aker
THD -> Session rename
396
  @details This function will signal libevent the Session should be killed.
520.1.22 by Brian Aker
Second pass of thd cleanup
397
    Either the global LOCK_session_count or the Session's LOCK_delete must be locked
1 by brian
clean slate
398
    upon entry.
399
 
520.1.22 by Brian Aker
Second pass of thd cleanup
400
  @param[in]  session The connection to kill
1 by brian
clean slate
401
*/
402
520.1.21 by Brian Aker
THD -> Session rename
403
static void libevent_post_kill_notification(Session *)
1 by brian
clean slate
404
{
405
  /*
520.1.21 by Brian Aker
THD -> Session rename
406
    Note, we just wake up libevent with an event that a Session should be killed,
520.1.22 by Brian Aker
Second pass of thd cleanup
407
    It will search its list of sessions for session->killed ==  KILL_CONNECTION to
520.1.21 by Brian Aker
THD -> Session rename
408
    find the Sessions it should kill.
1 by brian
clean slate
409
    
410
    So we don't actually tell it which one and we don't actually use the
520.1.21 by Brian Aker
THD -> Session rename
411
    Session being passed to us, but that's just a design detail that could change
1 by brian
clean slate
412
    later.
413
  */
414
  char c= 0;
520.1.22 by Brian Aker
Second pass of thd cleanup
415
  assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
1 by brian
clean slate
416
}
417
418
419
/*
420
  Close and delete a connection.
421
*/
422
520.1.22 by Brian Aker
Second pass of thd cleanup
423
static void libevent_connection_close(Session *session)
1 by brian
clean slate
424
{
520.1.22 by Brian Aker
Second pass of thd cleanup
425
  session->killed= Session::KILL_CONNECTION;          // Avoid error messages
1 by brian
clean slate
426
520.1.22 by Brian Aker
Second pass of thd cleanup
427
  if (net_get_sd(&(session->net)) >= 0)                  // not already closed
1 by brian
clean slate
428
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
429
    end_connection(session);
430
    close_connection(session, 0, 1);
1 by brian
clean slate
431
  }
520.1.22 by Brian Aker
Second pass of thd cleanup
432
  session->scheduler.thread_detach();
433
  unlink_session(session);   /* locks LOCK_thread_count and deletes session */
1 by brian
clean slate
434
  pthread_mutex_unlock(&LOCK_thread_count);
435
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
436
  return;
1 by brian
clean slate
437
}
438
439
440
/*
520.1.21 by Brian Aker
THD -> Session rename
441
  Returns true if we should close and delete a Session connection.
1 by brian
clean slate
442
*/
443
520.1.22 by Brian Aker
Second pass of thd cleanup
444
static bool libevent_should_close_connection(Session* session)
1 by brian
clean slate
445
{
520.1.22 by Brian Aker
Second pass of thd cleanup
446
  return net_should_close(&(session->net)) ||
447
         session->killed == Session::KILL_CONNECTION;
1 by brian
clean slate
448
}
449
450
451
/*
452
  libevent_thread_proc is the outer loop of each thread in the thread pool.
453
  These procs only return/terminate on shutdown (kill_pool_threads == true).
454
*/
455
212.1.3 by Monty Taylor
Renamed __attribute__((__unused__)) to __attribute__((unused)).
456
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
1 by brian
clean slate
457
{
458
  if (init_new_connection_handler_thread())
459
  {
460
    my_thread_global_end();
338 by Monty Taylor
Tagged more strings.
461
    sql_print_error(_("libevent_thread_proc: my_thread_init() failed\n"));
1 by brian
clean slate
462
    exit(1);
463
  }
464
465
  /*
466
    Signal libevent_init() when all threads has been created and are ready to
467
    receive events.
468
  */
469
  (void) pthread_mutex_lock(&LOCK_thread_count);
470
  created_threads++;
471
  if (created_threads == thread_pool_size)
472
    (void) pthread_cond_signal(&COND_thread_count);
473
  (void) pthread_mutex_unlock(&LOCK_thread_count);
474
  
475
  for (;;)
476
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
477
    Session *session= NULL;
1 by brian
clean slate
478
    (void) pthread_mutex_lock(&LOCK_event_loop);
479
    
520.1.22 by Brian Aker
Second pass of thd cleanup
480
    /* get session(s) to process */
481
    while (!sessions_need_processing)
1 by brian
clean slate
482
    {
483
      if (kill_pool_threads)
484
      {
485
        /* the flag that we should die has been set */
486
        (void) pthread_mutex_unlock(&LOCK_event_loop);
487
        goto thread_exit;
488
      }
489
      event_loop(EVLOOP_ONCE);
490
    }
491
    
520.1.22 by Brian Aker
Second pass of thd cleanup
492
    /* pop the first session off the list */
493
    session= (Session*)sessions_need_processing->data;
494
    sessions_need_processing= list_delete(sessions_need_processing,
495
                                      sessions_need_processing);
1 by brian
clean slate
496
    
497
    (void) pthread_mutex_unlock(&LOCK_event_loop);
498
    
520.1.22 by Brian Aker
Second pass of thd cleanup
499
    /* now we process the connection (session) */
500
    
501
    /* set up the session<->thread links. */
502
    session->thread_stack= (char*) &session;
503
    
504
    if (session->scheduler.thread_attach())
1 by brian
clean slate
505
    {
520.1.22 by Brian Aker
Second pass of thd cleanup
506
      libevent_connection_close(session);
1 by brian
clean slate
507
      continue;
508
    }
509
510
    /* is the connection logged in yet? */
520.1.22 by Brian Aker
Second pass of thd cleanup
511
    if (!session->scheduler.logged_in)
1 by brian
clean slate
512
    {
520.1.22 by Brian Aker
Second pass of thd cleanup
513
      if (login_connection(session))
1 by brian
clean slate
514
      {
515
        /* Failed to log in */
520.1.22 by Brian Aker
Second pass of thd cleanup
516
        libevent_connection_close(session);
1 by brian
clean slate
517
        continue;
518
      }
519
      else
520
      {
521
        /* login successful */
520.1.22 by Brian Aker
Second pass of thd cleanup
522
        session->scheduler.logged_in= true;
523
        prepare_new_connection_state(session);
524
        if (!libevent_needs_immediate_processing(session))
1 by brian
clean slate
525
          continue; /* New connection is now waiting for data in libevent*/
526
      }
527
    }
528
529
    do
530
    {
531
      /* Process a query */
520.1.22 by Brian Aker
Second pass of thd cleanup
532
      if (do_command(session))
1 by brian
clean slate
533
      {
520.1.22 by Brian Aker
Second pass of thd cleanup
534
        libevent_connection_close(session);
1 by brian
clean slate
535
        break;
536
      }
520.1.22 by Brian Aker
Second pass of thd cleanup
537
    } while (libevent_needs_immediate_processing(session));
1 by brian
clean slate
538
  }
539
  
540
thread_exit:
541
  (void) pthread_mutex_lock(&LOCK_thread_count);
542
  killed_threads++;
543
  pthread_cond_broadcast(&COND_thread_count);
544
  (void) pthread_mutex_unlock(&LOCK_thread_count);
545
  my_thread_end();
546
  pthread_exit(0);
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
547
  return(0);                               /* purify: deadcode */
1 by brian
clean slate
548
}
549
550
551
/*
55 by brian
Update for using real bool types.
552
  Returns true if the connection needs immediate processing and false if 
1 by brian
clean slate
553
  instead it's queued for libevent processing or closed,
554
*/
555
520.1.22 by Brian Aker
Second pass of thd cleanup
556
static bool libevent_needs_immediate_processing(Session *session)
1 by brian
clean slate
557
{
520.1.22 by Brian Aker
Second pass of thd cleanup
558
  if (libevent_should_close_connection(session))
1 by brian
clean slate
559
  {
520.1.22 by Brian Aker
Second pass of thd cleanup
560
    libevent_connection_close(session);
55 by brian
Update for using real bool types.
561
    return false;
1 by brian
clean slate
562
  }
563
  /*
55 by brian
Update for using real bool types.
564
    If more data in the socket buffer, return true to process another command.
1 by brian
clean slate
565
566
    Note: we cannot add for event processing because the whole request might
567
    already be buffered and we wouldn't receive an event.
568
  */
520.1.22 by Brian Aker
Second pass of thd cleanup
569
  if (net_more_data(&(session->net)))
55 by brian
Update for using real bool types.
570
    return true;
1 by brian
clean slate
571
  
520.1.22 by Brian Aker
Second pass of thd cleanup
572
  session->scheduler.thread_detach();
573
  libevent_session_add(session);
55 by brian
Update for using real bool types.
574
  return false;
1 by brian
clean slate
575
}
576
577
578
/*
520.1.21 by Brian Aker
THD -> Session rename
579
  Adds a Session to queued for libevent processing.
1 by brian
clean slate
580
  
581
  This call does not actually register the event with libevent.
520.1.21 by Brian Aker
THD -> Session rename
582
  Instead, it places the Session onto a queue and signals libevent by writing
520.1.22 by Brian Aker
Second pass of thd cleanup
583
  a byte into session_add_pipe, which will cause our libevent_add_session_callback to
520.1.21 by Brian Aker
THD -> Session rename
584
  be invoked which will find the Session on the queue and add it to libevent.
1 by brian
clean slate
585
*/
586
520.1.22 by Brian Aker
Second pass of thd cleanup
587
static void libevent_session_add(Session* session)
1 by brian
clean slate
588
{
589
  char c=0;
520.1.22 by Brian Aker
Second pass of thd cleanup
590
  pthread_mutex_lock(&LOCK_session_add);
1 by brian
clean slate
591
  /* queue for libevent */
520.1.22 by Brian Aker
Second pass of thd cleanup
592
  sessions_need_adding= list_add(sessions_need_adding, &session->scheduler.list);
1 by brian
clean slate
593
  /* notify libevent */
520.1.22 by Brian Aker
Second pass of thd cleanup
594
  assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
595
  pthread_mutex_unlock(&LOCK_session_add);
1 by brian
clean slate
596
}
597
598
599
/**
600
  Wait until all pool threads have been deleted for clean shutdown
601
*/
602
603
static void libevent_end()
604
{
605
  (void) pthread_mutex_lock(&LOCK_thread_count);
606
  
55 by brian
Update for using real bool types.
607
  kill_pool_threads= true;
1 by brian
clean slate
608
  while (killed_threads != created_threads)
609
  {
610
    /* wake up the event loop */
611
    char c= 0;
520.1.22 by Brian Aker
Second pass of thd cleanup
612
    assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
1 by brian
clean slate
613
614
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
615
  }
616
  (void) pthread_mutex_unlock(&LOCK_thread_count);
617
  
520.1.22 by Brian Aker
Second pass of thd cleanup
618
  event_del(&session_add_event);
619
  close(session_add_pipe[0]);
620
  close(session_add_pipe[1]);
621
  event_del(&session_kill_event);
622
  close(session_kill_pipe[0]);
623
  close(session_kill_pipe[1]);
1 by brian
clean slate
624
625
  (void) pthread_mutex_destroy(&LOCK_event_loop);
520.1.22 by Brian Aker
Second pass of thd cleanup
626
  (void) pthread_mutex_destroy(&LOCK_session_add);
51.1.45 by Jay Pipes
Removed/replaced DBUG symbols and standardized TRUE/FALSE
627
  return;
1 by brian
clean slate
628
}
629
630
631
void pool_of_threads_scheduler(scheduler_functions* func)
632
{
633
  func->max_threads= thread_pool_size;
634
  func->init= libevent_init;
635
  func->end=  libevent_end;
636
  func->post_kill_notification= libevent_post_kill_notification;
637
  func->add_connection= libevent_add_connection;
638
}