~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/scheduler.cc

  • Committer: Eric Herman
  • Date: 2008-12-06 19:42:46 UTC
  • mto: (656.1.6 devel)
  • mto: This revision was merged to the branch mainline in revision 665.
  • Revision ID: eric@mysql.com-20081206194246-5cdexuu81i366eek
removed trailing whitespace with simple script:

for file in $(find . -name "*.c") $(find . -name "*.cc") $(find . -name "*.h"); do ruby -pe 'gsub(/\s+$/, $/)' < $file > $file.out; mv $file.out $file; done;

Show diffs side-by-side

added added

removed removed

Lines of Context:
63
63
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
64
64
 
65
65
/*
66
 
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and 
 
66
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and
67
67
  event_del) and sessions_need_processing and sessions_waiting_for_io.
68
68
*/
69
69
static pthread_mutex_t LOCK_event_loop;
104
104
 
105
105
session_scheduler::session_scheduler()
106
106
  : logged_in(false), io_event(NULL), thread_attached(false)
107
 
{  
 
107
{
108
108
}
109
109
 
110
110
 
124
124
bool session_scheduler::init(Session *parent_session)
125
125
{
126
126
  io_event= (struct event*)malloc(sizeof(*io_event));
127
 
    
 
127
 
128
128
  if (!io_event)
129
129
  {
130
130
    sql_print_error(_("Memory allocation error in session_scheduler::init\n"));
131
131
    return true;
132
132
  }
133
133
  memset(io_event, 0, sizeof(*io_event));
134
 
 
135
 
  event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ, 
 
134
 
 
135
  event_set(io_event, net_get_sd(&(parent_session->net)), EV_READ,
136
136
            libevent_io_callback, (void*)parent_session);
137
 
    
 
137
 
138
138
  list.data= parent_session;
139
 
  
 
139
 
140
140
  return false;
141
141
}
142
142
 
194
194
  uint32_t i;
195
195
 
196
196
  event_init();
197
 
  
 
197
 
198
198
  created_threads= 0;
199
199
  killed_threads= 0;
200
200
  kill_pool_threads= false;
201
 
  
 
201
 
202
202
  pthread_mutex_init(&LOCK_event_loop, NULL);
203
203
  pthread_mutex_init(&LOCK_session_add, NULL);
204
 
  
 
204
 
205
205
  /* set up the pipe used to add new sessions to the event pool */
206
206
  if (init_pipe(session_add_pipe))
207
207
  {
220
220
            libevent_add_session_callback, NULL);
221
221
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
222
222
            libevent_kill_session_callback, NULL);
223
 
 
 
223
 
224
224
 if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
225
225
 {
226
226
   sql_print_error(_("session_add_event event_add error in libevent_init\n"));
227
227
   libevent_end();
228
228
   return(1);
229
 
   
 
229
 
230
230
 }
231
231
  /* Set up the thread pool */
232
232
  created_threads= killed_threads= 0;
251
251
  while (created_threads != thread_pool_size)
252
252
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
253
253
  pthread_mutex_unlock(&LOCK_thread_count);
254
 
  
 
254
 
255
255
  return(false);
256
256
}
257
257
 
258
258
 
259
259
/*
260
260
  This is called when data is ready on the socket.
261
 
  
 
261
 
262
262
  NOTES
263
263
    This is only called by the thread that owns LOCK_event_loop.
264
 
  
265
 
    We add the session that got the data to sessions_need_processing, and 
 
264
 
 
265
    We add the session that got the data to sessions_need_processing, and
266
266
    cause the libevent event_loop() to terminate. Then this same thread will
267
267
    return from event_loop and pick the session value back up for processing.
268
268
*/
269
269
 
270
270
void libevent_io_callback(int, short, void *ctx)
271
 
{    
 
271
{
272
272
  safe_mutex_assert_owner(&LOCK_event_loop);
273
273
  Session *session= (Session*)ctx;
274
274
  sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &session->scheduler.list);
277
277
 
278
278
/*
279
279
  This is called when we have a thread we want to be killed.
280
 
  
 
280
 
281
281
  NOTES
282
282
    This is only called by the thread that owns LOCK_event_loop.
283
283
*/
284
284
 
285
285
void libevent_kill_session_callback(int Fd, short, void*)
286
 
{    
 
286
{
287
287
  safe_mutex_assert_owner(&LOCK_event_loop);
288
288
 
289
289
  /* clear the pending events */
315
315
  This is used to add connections to the pool. This callback is invoked from
316
316
  the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
317
317
  written to it.
318
 
  
 
318
 
319
319
  NOTES
320
320
    This is only called by the thread that owns LOCK_event_loop.
321
321
*/
322
322
 
323
323
void libevent_add_session_callback(int Fd, short, void *)
324
 
 
324
{
325
325
  safe_mutex_assert_owner(&LOCK_event_loop);
326
326
 
327
327
  /* clear the pending events */
337
337
    sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
338
338
 
339
339
    pthread_mutex_unlock(&LOCK_session_add);
340
 
    
 
340
 
341
341
    if (!session->scheduler.logged_in || libevent_should_close_connection(session))
342
342
    {
343
343
      /*
354
354
      {
355
355
        sql_print_error(_("event_add error in libevent_add_session_callback\n"));
356
356
        libevent_connection_close(session);
357
 
      } 
 
357
      }
358
358
      else
359
359
      {
360
360
        sessions_waiting_for_io= list_add(sessions_waiting_for_io,
385
385
  }
386
386
  threads.append(session);
387
387
  libevent_session_add(session);
388
 
  
 
388
 
389
389
  pthread_mutex_unlock(&LOCK_thread_count);
390
390
  return;
391
391
}
393
393
 
394
394
/**
395
395
  @brief Signal a waiting connection it's time to die.
396
 
 
 
396
 
397
397
  @details This function will signal libevent the Session should be killed.
398
398
    Either the global LOCK_session_count or the Session's LOCK_delete must be locked
399
399
    upon entry.
400
 
 
 
400
 
401
401
  @param[in]  session The connection to kill
402
402
*/
403
403
 
407
407
    Note, we just wake up libevent with an event that a Session should be killed,
408
408
    It will search its list of sessions for session->killed ==  KILL_CONNECTION to
409
409
    find the Sessions it should kill.
410
 
    
 
410
 
411
411
    So we don't actually tell it which one and we don't actually use the
412
412
    Session being passed to us, but that's just a design detail that could change
413
413
    later.
472
472
  if (created_threads == thread_pool_size)
473
473
    (void) pthread_cond_signal(&COND_thread_count);
474
474
  (void) pthread_mutex_unlock(&LOCK_thread_count);
475
 
  
 
475
 
476
476
  for (;;)
477
477
  {
478
478
    Session *session= NULL;
479
479
    (void) pthread_mutex_lock(&LOCK_event_loop);
480
 
    
 
480
 
481
481
    /* get session(s) to process */
482
482
    while (!sessions_need_processing)
483
483
    {
489
489
      }
490
490
      event_loop(EVLOOP_ONCE);
491
491
    }
492
 
    
 
492
 
493
493
    /* pop the first session off the list */
494
494
    session= (Session*)sessions_need_processing->data;
495
495
    sessions_need_processing= list_delete(sessions_need_processing,
496
496
                                      sessions_need_processing);
497
 
    
 
497
 
498
498
    (void) pthread_mutex_unlock(&LOCK_event_loop);
499
 
    
 
499
 
500
500
    /* now we process the connection (session) */
501
 
    
 
501
 
502
502
    /* set up the session<->thread links. */
503
503
    session->thread_stack= (char*) &session;
504
 
    
 
504
 
505
505
    if (session->scheduler.thread_attach())
506
506
    {
507
507
      libevent_connection_close(session);
537
537
      }
538
538
    } while (libevent_needs_immediate_processing(session));
539
539
  }
540
 
  
 
540
 
541
541
thread_exit:
542
542
  (void) pthread_mutex_lock(&LOCK_thread_count);
543
543
  killed_threads++;
550
550
 
551
551
 
552
552
/*
553
 
  Returns true if the connection needs immediate processing and false if 
 
553
  Returns true if the connection needs immediate processing and false if
554
554
  instead it's queued for libevent processing or closed,
555
555
*/
556
556
 
569
569
  */
570
570
  if (net_more_data(&(session->net)))
571
571
    return true;
572
 
  
 
572
 
573
573
  session->scheduler.thread_detach();
574
574
  libevent_session_add(session);
575
575
  return false;
578
578
 
579
579
/*
580
580
  Adds a Session to queued for libevent processing.
581
 
  
 
581
 
582
582
  This call does not actually register the event with libevent.
583
583
  Instead, it places the Session onto a queue and signals libevent by writing
584
584
  a byte into session_add_pipe, which will cause our libevent_add_session_callback to
604
604
static void libevent_end()
605
605
{
606
606
  (void) pthread_mutex_lock(&LOCK_thread_count);
607
 
  
 
607
 
608
608
  kill_pool_threads= true;
609
609
  while (killed_threads != created_threads)
610
610
  {
615
615
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
616
616
  }
617
617
  (void) pthread_mutex_unlock(&LOCK_thread_count);
618
 
  
 
618
 
619
619
  event_del(&session_add_event);
620
620
  close(session_add_pipe[0]);
621
621
  close(session_add_pipe[1]);