~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/scheduler.cc

  • Committer: Brian Aker
  • Date: 2008-10-20 04:28:21 UTC
  • mto: (492.3.21 drizzle-clean-code)
  • mto: This revision was merged to the branch mainline in revision 530.
  • Revision ID: brian@tangent.org-20081020042821-rqqdrccuu8195k3y
Second pass of thd cleanup

Show diffs side-by-side

added added

removed removed

Lines of Context:
28
28
 */
29
29
 
30
30
static bool init_dummy(void) {return 0;}
31
 
static void post_kill_dummy(Session *thd __attribute__((unused))) {}
 
31
static void post_kill_dummy(Session *session __attribute__((unused))) {}
32
32
static void end_dummy(void) {}
33
 
static bool end_thread_dummy(Session *thd __attribute__((unused)),
 
33
static bool end_thread_dummy(Session *session __attribute__((unused)),
34
34
                             bool cache_thread __attribute__((unused)))
35
35
{ return 0; }
36
36
 
50
50
static uint32_t created_threads, killed_threads;
51
51
static bool kill_pool_threads;
52
52
 
53
 
static struct event thd_add_event;
54
 
static struct event thd_kill_event;
55
 
 
56
 
static pthread_mutex_t LOCK_thd_add;    /* protects thds_need_adding */
57
 
static LIST *thds_need_adding;    /* list of thds to add to libevent queue */
58
 
 
59
 
static int thd_add_pipe[2]; /* pipe to signal add a connection to libevent*/
60
 
static int thd_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
 
53
static struct event session_add_event;
 
54
static struct event session_kill_event;
 
55
 
 
56
static pthread_mutex_t LOCK_session_add;    /* protects sessions_need_adding */
 
57
static LIST *sessions_need_adding;    /* list of sessions to add to libevent queue */
 
58
 
 
59
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
 
60
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
61
61
 
62
62
/*
63
63
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
64
 
  event_del) and thds_need_processing and thds_waiting_for_io.
 
64
  event_del) and sessions_need_processing and sessions_waiting_for_io.
65
65
*/
66
66
static pthread_mutex_t LOCK_event_loop;
67
 
static LIST *thds_need_processing; /* list of thds that needs some processing */
68
 
static LIST *thds_waiting_for_io; /* list of thds with added events */
 
67
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
 
68
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
69
69
 
70
70
pthread_handler_t libevent_thread_proc(void *arg);
71
71
static void libevent_end();
72
 
static bool libevent_needs_immediate_processing(Session *thd);
73
 
static void libevent_connection_close(Session *thd);
74
 
static bool libevent_should_close_connection(Session* thd);
75
 
static void libevent_thd_add(Session* thd);
 
72
static bool libevent_needs_immediate_processing(Session *session);
 
73
static void libevent_connection_close(Session *session);
 
74
static bool libevent_should_close_connection(Session* session);
 
75
static void libevent_session_add(Session* session);
76
76
void libevent_io_callback(int Fd, short Operation, void *ctx);
77
 
void libevent_add_thd_callback(int Fd, short Operation, void *ctx);
78
 
void libevent_kill_thd_callback(int Fd, short Operation, void *ctx);
 
77
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
 
78
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
79
79
 
80
80
 
81
81
/*
95
95
 
96
96
 
97
97
/*
98
 
  thd_scheduler keeps the link between Session and events.
 
98
  session_scheduler keeps the link between Session and events.
99
99
  It's embedded in the Session class.
100
100
*/
101
101
 
102
 
thd_scheduler::thd_scheduler()
 
102
session_scheduler::session_scheduler()
103
103
  : logged_in(false), io_event(NULL), thread_attached(false)
104
104
{  
105
105
}
106
106
 
107
107
 
108
 
thd_scheduler::~thd_scheduler()
 
108
session_scheduler::~session_scheduler()
109
109
{
110
110
  free(io_event);
111
111
}
112
112
 
113
113
 
114
 
thd_scheduler::thd_scheduler(const thd_scheduler&)
 
114
session_scheduler::session_scheduler(const session_scheduler&)
115
115
  : logged_in(false), io_event(NULL), thread_attached(false)
116
116
{}
117
117
 
118
 
void thd_scheduler::operator=(const thd_scheduler&)
 
118
void session_scheduler::operator=(const session_scheduler&)
119
119
{}
120
120
 
121
 
bool thd_scheduler::init(Session *parent_thd)
 
121
bool session_scheduler::init(Session *parent_session)
122
122
{
123
123
  io_event=
124
124
    (struct event*)my_malloc(sizeof(*io_event),MYF(MY_ZEROFILL|MY_WME));
125
125
    
126
126
  if (!io_event)
127
127
  {
128
 
    sql_print_error(_("Memory allocation error in thd_scheduler::init\n"));
 
128
    sql_print_error(_("Memory allocation error in session_scheduler::init\n"));
129
129
    return true;
130
130
  }
131
131
  
132
 
  event_set(io_event, net_get_sd(&(parent_thd->net)), EV_READ, 
133
 
            libevent_io_callback, (void*)parent_thd);
 
132
  event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ, 
 
133
            libevent_io_callback, (void*)parent_session);
134
134
    
135
 
  list.data= parent_thd;
 
135
  list.data= parent_session;
136
136
  
137
137
  return false;
138
138
}
142
142
  Attach/associate the connection with the OS thread, for command processing.
143
143
*/
144
144
 
145
 
bool thd_scheduler::thread_attach()
 
145
bool session_scheduler::thread_attach()
146
146
{
147
147
  assert(!thread_attached);
148
 
  Session* thd = (Session*)list.data;
149
 
  if (libevent_should_close_connection(thd) ||
150
 
      setup_connection_thread_globals(thd))
 
148
  Session* session = (Session*)list.data;
 
149
  if (libevent_should_close_connection(session) ||
 
150
      setup_connection_thread_globals(session))
151
151
  {
152
152
    return true;
153
153
  }
154
154
  my_errno= 0;
155
 
  thd->mysys_var->abort= 0;
 
155
  session->mysys_var->abort= 0;
156
156
  thread_attached= true;
157
157
 
158
158
  return false;
163
163
  Detach/disassociate the connection with the OS thread.
164
164
*/
165
165
 
166
 
void thd_scheduler::thread_detach()
 
166
void session_scheduler::thread_detach()
167
167
{
168
168
  if (thread_attached)
169
169
  {
170
 
    Session* thd = (Session*)list.data;
171
 
    thd->mysys_var= NULL;
 
170
    Session* session = (Session*)list.data;
 
171
    session->mysys_var= NULL;
172
172
    thread_attached= false;
173
173
  }
174
174
}
197
197
  kill_pool_threads= false;
198
198
  
199
199
  pthread_mutex_init(&LOCK_event_loop, NULL);
200
 
  pthread_mutex_init(&LOCK_thd_add, NULL);
 
200
  pthread_mutex_init(&LOCK_session_add, NULL);
201
201
  
202
 
  /* set up the pipe used to add new thds to the event pool */
203
 
  if (init_pipe(thd_add_pipe))
204
 
  {
205
 
    sql_print_error(_("init_pipe(thd_add_pipe) error in libevent_init\n"));
206
 
    return(1);
207
 
  }
208
 
  /* set up the pipe used to kill thds in the event queue */
209
 
  if (init_pipe(thd_kill_pipe))
210
 
  {
211
 
    sql_print_error(_("init_pipe(thd_kill_pipe) error in libevent_init\n"));
212
 
    close(thd_add_pipe[0]);
213
 
    close(thd_add_pipe[1]);
214
 
    return(1);
215
 
  }
216
 
  event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
217
 
            libevent_add_thd_callback, NULL);
218
 
  event_set(&thd_kill_event, thd_kill_pipe[0], EV_READ|EV_PERSIST,
219
 
            libevent_kill_thd_callback, NULL);
 
202
  /* set up the pipe used to add new sessions to the event pool */
 
203
  if (init_pipe(session_add_pipe))
 
204
  {
 
205
    sql_print_error(_("init_pipe(session_add_pipe) error in libevent_init\n"));
 
206
    return(1);
 
207
  }
 
208
  /* set up the pipe used to kill sessions in the event queue */
 
209
  if (init_pipe(session_kill_pipe))
 
210
  {
 
211
    sql_print_error(_("init_pipe(session_kill_pipe) error in libevent_init\n"));
 
212
    close(session_add_pipe[0]);
 
213
    close(session_add_pipe[1]);
 
214
    return(1);
 
215
  }
 
216
  event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
 
217
            libevent_add_session_callback, NULL);
 
218
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
 
219
            libevent_kill_session_callback, NULL);
220
220
 
221
 
 if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
 
221
 if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
222
222
 {
223
 
   sql_print_error(_("thd_add_event event_add error in libevent_init\n"));
 
223
   sql_print_error(_("session_add_event event_add error in libevent_init\n"));
224
224
   libevent_end();
225
225
   return(1);
226
226
   
259
259
  NOTES
260
260
    This is only called by the thread that owns LOCK_event_loop.
261
261
  
262
 
    We add the thd that got the data to thds_need_processing, and 
 
262
    We add the session that got the data to sessions_need_processing, and 
263
263
    cause the libevent event_loop() to terminate. Then this same thread will
264
 
    return from event_loop and pick the thd value back up for processing.
 
264
    return from event_loop and pick the session value back up for processing.
265
265
*/
266
266
 
267
267
void libevent_io_callback(int, short, void *ctx)
268
268
{    
269
269
  safe_mutex_assert_owner(&LOCK_event_loop);
270
 
  Session *thd= (Session*)ctx;
271
 
  thds_waiting_for_io= list_delete(thds_waiting_for_io, &thd->scheduler.list);
272
 
  thds_need_processing= list_add(thds_need_processing, &thd->scheduler.list);
 
270
  Session *session= (Session*)ctx;
 
271
  sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
 
272
  sessions_need_processing= list_add(sessions_need_processing, &session->scheduler.list);
273
273
}
274
274
 
275
275
/*
279
279
    This is only called by the thread that owns LOCK_event_loop.
280
280
*/
281
281
 
282
 
void libevent_kill_thd_callback(int Fd, short, void*)
 
282
void libevent_kill_session_callback(int Fd, short, void*)
283
283
{    
284
284
  safe_mutex_assert_owner(&LOCK_event_loop);
285
285
 
288
288
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
289
289
  {}
290
290
 
291
 
  LIST* list= thds_waiting_for_io;
 
291
  LIST* list= sessions_waiting_for_io;
292
292
  while (list)
293
293
  {
294
 
    Session *thd= (Session*)list->data;
 
294
    Session *session= (Session*)list->data;
295
295
    list= list_rest(list);
296
 
    if (thd->killed == Session::KILL_CONNECTION)
 
296
    if (session->killed == Session::KILL_CONNECTION)
297
297
    {
298
298
      /*
299
299
        Delete from libevent and add to the processing queue.
300
300
      */
301
 
      event_del(thd->scheduler.io_event);
302
 
      thds_waiting_for_io= list_delete(thds_waiting_for_io,
303
 
                                       &thd->scheduler.list);
304
 
      thds_need_processing= list_add(thds_need_processing,
305
 
                                     &thd->scheduler.list);
 
301
      event_del(session->scheduler.io_event);
 
302
      sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
 
303
                                       &session->scheduler.list);
 
304
      sessions_need_processing= list_add(sessions_need_processing,
 
305
                                     &session->scheduler.list);
306
306
    }
307
307
  }
308
308
}
310
310
 
311
311
/*
312
312
  This is used to add connections to the pool. This callback is invoked from
313
 
  the libevent event_loop() call whenever the thd_add_pipe[1] pipe has a byte
 
313
  the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
314
314
  written to it.
315
315
  
316
316
  NOTES
317
317
    This is only called by the thread that owns LOCK_event_loop.
318
318
*/
319
319
 
320
 
void libevent_add_thd_callback(int Fd, short, void *)
 
320
void libevent_add_session_callback(int Fd, short, void *)
321
321
322
322
  safe_mutex_assert_owner(&LOCK_event_loop);
323
323
 
326
326
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
327
327
  {}
328
328
 
329
 
  pthread_mutex_lock(&LOCK_thd_add);
330
 
  while (thds_need_adding)
 
329
  pthread_mutex_lock(&LOCK_session_add);
 
330
  while (sessions_need_adding)
331
331
  {
332
 
    /* pop the first thd off the list */
333
 
    Session* thd= (Session*)thds_need_adding->data;
334
 
    thds_need_adding= list_delete(thds_need_adding, thds_need_adding);
 
332
    /* pop the first session off the list */
 
333
    Session* session= (Session*)sessions_need_adding->data;
 
334
    sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
335
335
 
336
 
    pthread_mutex_unlock(&LOCK_thd_add);
 
336
    pthread_mutex_unlock(&LOCK_session_add);
337
337
    
338
 
    if (!thd->scheduler.logged_in || libevent_should_close_connection(thd))
 
338
    if (!session->scheduler.logged_in || libevent_should_close_connection(session))
339
339
    {
340
340
      /*
341
 
        Add thd to thds_need_processing list. If it needs closing we'll close
 
341
        Add session to sessions_need_processing list. If it needs closing we'll close
342
342
        it outside of event_loop().
343
343
      */
344
 
      thds_need_processing= list_add(thds_need_processing,
345
 
                                     &thd->scheduler.list);
 
344
      sessions_need_processing= list_add(sessions_need_processing,
 
345
                                     &session->scheduler.list);
346
346
    }
347
347
    else
348
348
    {
349
349
      /* Add to libevent */
350
 
      if (event_add(thd->scheduler.io_event, NULL))
 
350
      if (event_add(session->scheduler.io_event, NULL))
351
351
      {
352
 
        sql_print_error(_("event_add error in libevent_add_thd_callback\n"));
353
 
        libevent_connection_close(thd);
 
352
        sql_print_error(_("event_add error in libevent_add_session_callback\n"));
 
353
        libevent_connection_close(session);
354
354
      } 
355
355
      else
356
356
      {
357
 
        thds_waiting_for_io= list_add(thds_waiting_for_io,
358
 
                                      &thd->scheduler.list);
 
357
        sessions_waiting_for_io= list_add(sessions_waiting_for_io,
 
358
                                      &session->scheduler.list);
359
359
      }
360
360
    }
361
 
    pthread_mutex_lock(&LOCK_thd_add);
 
361
    pthread_mutex_lock(&LOCK_session_add);
362
362
  }
363
 
  pthread_mutex_unlock(&LOCK_thd_add);
 
363
  pthread_mutex_unlock(&LOCK_session_add);
364
364
}
365
365
 
366
366
 
371
371
    LOCK_thread_count is locked on entry. This function MUST unlock it!
372
372
*/
373
373
 
374
 
static void libevent_add_connection(Session *thd)
 
374
static void libevent_add_connection(Session *session)
375
375
{
376
 
  if (thd->scheduler.init(thd))
 
376
  if (session->scheduler.init(session))
377
377
  {
378
378
    sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
379
379
    pthread_mutex_unlock(&LOCK_thread_count);
380
 
    libevent_connection_close(thd);
 
380
    libevent_connection_close(session);
381
381
    return;
382
382
  }
383
 
  threads.append(thd);
384
 
  libevent_thd_add(thd);
 
383
  threads.append(session);
 
384
  libevent_session_add(session);
385
385
  
386
386
  pthread_mutex_unlock(&LOCK_thread_count);
387
387
  return;
392
392
  @brief Signal a waiting connection it's time to die.
393
393
 
394
394
  @details This function will signal libevent the Session should be killed.
395
 
    Either the global LOCK_thd_count or the Session's LOCK_delete must be locked
 
395
    Either the global LOCK_session_count or the Session's LOCK_delete must be locked
396
396
    upon entry.
397
397
 
398
 
  @param[in]  thd The connection to kill
 
398
  @param[in]  session The connection to kill
399
399
*/
400
400
 
401
401
static void libevent_post_kill_notification(Session *)
402
402
{
403
403
  /*
404
404
    Note, we just wake up libevent with an event that a Session should be killed,
405
 
    It will search its list of thds for thd->killed ==  KILL_CONNECTION to
 
405
    It will search its list of sessions for session->killed ==  KILL_CONNECTION to
406
406
    find the Sessions it should kill.
407
407
    
408
408
    So we don't actually tell it which one and we don't actually use the
410
410
    later.
411
411
  */
412
412
  char c= 0;
413
 
  assert(write(thd_kill_pipe[1], &c, sizeof(c))==sizeof(c));
 
413
  assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
414
414
}
415
415
 
416
416
 
418
418
  Close and delete a connection.
419
419
*/
420
420
 
421
 
static void libevent_connection_close(Session *thd)
 
421
static void libevent_connection_close(Session *session)
422
422
{
423
 
  thd->killed= Session::KILL_CONNECTION;          // Avoid error messages
 
423
  session->killed= Session::KILL_CONNECTION;          // Avoid error messages
424
424
 
425
 
  if (net_get_sd(&(thd->net)) >= 0)                  // not already closed
 
425
  if (net_get_sd(&(session->net)) >= 0)                  // not already closed
426
426
  {
427
 
    end_connection(thd);
428
 
    close_connection(thd, 0, 1);
 
427
    end_connection(session);
 
428
    close_connection(session, 0, 1);
429
429
  }
430
 
  thd->scheduler.thread_detach();
431
 
  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
 
430
  session->scheduler.thread_detach();
 
431
  unlink_session(session);   /* locks LOCK_thread_count and deletes session */
432
432
  pthread_mutex_unlock(&LOCK_thread_count);
433
433
 
434
434
  return;
439
439
  Returns true if we should close and delete a Session connection.
440
440
*/
441
441
 
442
 
static bool libevent_should_close_connection(Session* thd)
 
442
static bool libevent_should_close_connection(Session* session)
443
443
{
444
 
  return net_should_close(&(thd->net)) ||
445
 
         thd->killed == Session::KILL_CONNECTION;
 
444
  return net_should_close(&(session->net)) ||
 
445
         session->killed == Session::KILL_CONNECTION;
446
446
}
447
447
 
448
448
 
472
472
  
473
473
  for (;;)
474
474
  {
475
 
    Session *thd= NULL;
 
475
    Session *session= NULL;
476
476
    (void) pthread_mutex_lock(&LOCK_event_loop);
477
477
    
478
 
    /* get thd(s) to process */
479
 
    while (!thds_need_processing)
 
478
    /* get session(s) to process */
 
479
    while (!sessions_need_processing)
480
480
    {
481
481
      if (kill_pool_threads)
482
482
      {
487
487
      event_loop(EVLOOP_ONCE);
488
488
    }
489
489
    
490
 
    /* pop the first thd off the list */
491
 
    thd= (Session*)thds_need_processing->data;
492
 
    thds_need_processing= list_delete(thds_need_processing,
493
 
                                      thds_need_processing);
 
490
    /* pop the first session off the list */
 
491
    session= (Session*)sessions_need_processing->data;
 
492
    sessions_need_processing= list_delete(sessions_need_processing,
 
493
                                      sessions_need_processing);
494
494
    
495
495
    (void) pthread_mutex_unlock(&LOCK_event_loop);
496
496
    
497
 
    /* now we process the connection (thd) */
498
 
    
499
 
    /* set up the thd<->thread links. */
500
 
    thd->thread_stack= (char*) &thd;
501
 
    
502
 
    if (thd->scheduler.thread_attach())
 
497
    /* now we process the connection (session) */
 
498
    
 
499
    /* set up the session<->thread links. */
 
500
    session->thread_stack= (char*) &session;
 
501
    
 
502
    if (session->scheduler.thread_attach())
503
503
    {
504
 
      libevent_connection_close(thd);
 
504
      libevent_connection_close(session);
505
505
      continue;
506
506
    }
507
507
 
508
508
    /* is the connection logged in yet? */
509
 
    if (!thd->scheduler.logged_in)
 
509
    if (!session->scheduler.logged_in)
510
510
    {
511
 
      if (login_connection(thd))
 
511
      if (login_connection(session))
512
512
      {
513
513
        /* Failed to log in */
514
 
        libevent_connection_close(thd);
 
514
        libevent_connection_close(session);
515
515
        continue;
516
516
      }
517
517
      else
518
518
      {
519
519
        /* login successful */
520
 
        thd->scheduler.logged_in= true;
521
 
        prepare_new_connection_state(thd);
522
 
        if (!libevent_needs_immediate_processing(thd))
 
520
        session->scheduler.logged_in= true;
 
521
        prepare_new_connection_state(session);
 
522
        if (!libevent_needs_immediate_processing(session))
523
523
          continue; /* New connection is now waiting for data in libevent*/
524
524
      }
525
525
    }
527
527
    do
528
528
    {
529
529
      /* Process a query */
530
 
      if (do_command(thd))
 
530
      if (do_command(session))
531
531
      {
532
 
        libevent_connection_close(thd);
 
532
        libevent_connection_close(session);
533
533
        break;
534
534
      }
535
 
    } while (libevent_needs_immediate_processing(thd));
 
535
    } while (libevent_needs_immediate_processing(session));
536
536
  }
537
537
  
538
538
thread_exit:
551
551
  instead it's queued for libevent processing or closed,
552
552
*/
553
553
 
554
 
static bool libevent_needs_immediate_processing(Session *thd)
 
554
static bool libevent_needs_immediate_processing(Session *session)
555
555
{
556
 
  if (libevent_should_close_connection(thd))
 
556
  if (libevent_should_close_connection(session))
557
557
  {
558
 
    libevent_connection_close(thd);
 
558
    libevent_connection_close(session);
559
559
    return false;
560
560
  }
561
561
  /*
564
564
    Note: we cannot add for event processing because the whole request might
565
565
    already be buffered and we wouldn't receive an event.
566
566
  */
567
 
  if (net_more_data(&(thd->net)))
 
567
  if (net_more_data(&(session->net)))
568
568
    return true;
569
569
  
570
 
  thd->scheduler.thread_detach();
571
 
  libevent_thd_add(thd);
 
570
  session->scheduler.thread_detach();
 
571
  libevent_session_add(session);
572
572
  return false;
573
573
}
574
574
 
578
578
  
579
579
  This call does not actually register the event with libevent.
580
580
  Instead, it places the Session onto a queue and signals libevent by writing
581
 
  a byte into thd_add_pipe, which will cause our libevent_add_thd_callback to
 
581
  a byte into session_add_pipe, which will cause our libevent_add_session_callback to
582
582
  be invoked which will find the Session on the queue and add it to libevent.
583
583
*/
584
584
 
585
 
static void libevent_thd_add(Session* thd)
 
585
static void libevent_session_add(Session* session)
586
586
{
587
587
  char c=0;
588
 
  pthread_mutex_lock(&LOCK_thd_add);
 
588
  pthread_mutex_lock(&LOCK_session_add);
589
589
  /* queue for libevent */
590
 
  thds_need_adding= list_add(thds_need_adding, &thd->scheduler.list);
 
590
  sessions_need_adding= list_add(sessions_need_adding, &session->scheduler.list);
591
591
  /* notify libevent */
592
 
  assert(write(thd_add_pipe[1], &c, sizeof(c))==sizeof(c));
593
 
  pthread_mutex_unlock(&LOCK_thd_add);
 
592
  assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
 
593
  pthread_mutex_unlock(&LOCK_session_add);
594
594
}
595
595
 
596
596
 
607
607
  {
608
608
    /* wake up the event loop */
609
609
    char c= 0;
610
 
    assert(write(thd_add_pipe[1], &c, sizeof(c))==sizeof(c));
 
610
    assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
611
611
 
612
612
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
613
613
  }
614
614
  (void) pthread_mutex_unlock(&LOCK_thread_count);
615
615
  
616
 
  event_del(&thd_add_event);
617
 
  close(thd_add_pipe[0]);
618
 
  close(thd_add_pipe[1]);
619
 
  event_del(&thd_kill_event);
620
 
  close(thd_kill_pipe[0]);
621
 
  close(thd_kill_pipe[1]);
 
616
  event_del(&session_add_event);
 
617
  close(session_add_pipe[0]);
 
618
  close(session_add_pipe[1]);
 
619
  event_del(&session_kill_event);
 
620
  close(session_kill_pipe[0]);
 
621
  close(session_kill_pipe[1]);
622
622
 
623
623
  (void) pthread_mutex_destroy(&LOCK_event_loop);
624
 
  (void) pthread_mutex_destroy(&LOCK_thd_add);
 
624
  (void) pthread_mutex_destroy(&LOCK_session_add);
625
625
  return;
626
626
}
627
627