~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to server/scheduler.cc

  • Committer: Brian Aker
  • Date: 2008-07-20 07:36:57 UTC
  • Revision ID: brian@tangent.org-20080720073657-qrzqnfu31mut8vjd
my_bool...

Show diffs side-by-side

added added

removed removed

Lines of Context:
31
31
 */
32
32
 
33
33
static bool init_dummy(void) {return 0;}
34
 
static void post_kill_dummy(THD *thd) {}  
 
34
static void post_kill_dummy(THD *thd __attribute__((__unused__))) {}
35
35
static void end_dummy(void) {}
36
 
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; }
 
36
static bool end_thread_dummy(THD *thd __attribute__((__unused__)),
 
37
                             bool cache_thread __attribute__((__unused__)))
 
38
{ return 0; }
37
39
 
38
40
/*
39
41
  Initialize default scheduler with dummy functions so that setup functions
81
83
 
82
84
/*
83
85
  Create a pipe and set to non-blocking.
84
 
  Returns TRUE if there is an error.
 
86
  Returns true if there is an error.
85
87
*/
86
88
 
87
89
static bool init_pipe(int pipe_fds[])
101
103
*/
102
104
 
103
105
thd_scheduler::thd_scheduler()
104
 
  : logged_in(FALSE), io_event(NULL), thread_attached(FALSE)
 
106
  : logged_in(false), io_event(NULL), thread_attached(false)
105
107
{  
106
 
#ifndef DBUG_OFF
107
108
  dbug_explain_buf[0]= 0;
108
 
#endif
109
109
}
110
110
 
111
111
 
123
123
  if (!io_event)
124
124
  {
125
125
    sql_print_error("Memory allocation error in thd_scheduler::init\n");
126
 
    return TRUE;
 
126
    return true;
127
127
  }
128
128
  
129
129
  event_set(io_event, parent_thd->net.vio->sd, EV_READ, 
131
131
    
132
132
  list.data= parent_thd;
133
133
  
134
 
  return FALSE;
 
134
  return false;
135
135
}
136
136
 
137
137
 
141
141
 
142
142
bool thd_scheduler::thread_attach()
143
143
{
144
 
  DBUG_ASSERT(!thread_attached);
 
144
  assert(!thread_attached);
145
145
  THD* thd = (THD*)list.data;
146
146
  if (libevent_should_close_connection(thd) ||
147
147
      setup_connection_thread_globals(thd))
148
148
  {
149
 
    return TRUE;
 
149
    return true;
150
150
  }
151
151
  my_errno= 0;
152
152
  thd->mysys_var->abort= 0;
153
 
  thread_attached= TRUE;
154
 
#ifndef DBUG_OFF
 
153
  thread_attached= true;
155
154
  swap_dbug_explain();
156
 
#endif
157
 
  return FALSE;
 
155
  return false;
158
156
}
159
157
 
160
158
 
168
166
  {
169
167
    THD* thd = (THD*)list.data;
170
168
    thd->mysys_var= NULL;
171
 
    thread_attached= FALSE;
172
 
#ifndef DBUG_OFF
 
169
    thread_attached= false;
173
170
    swap_dbug_explain();
174
 
#endif
175
171
  }
176
172
}
177
173
 
182
178
  This is used to preserve the SESSION DEBUG variable, which is mapped to the OS 
183
179
  thread during a command, but each command is handled by a different thread.
184
180
*/
185
 
 
186
 
#ifndef DBUG_OFF
187
181
void thd_scheduler::swap_dbug_explain()
188
182
{
189
183
  char buffer[sizeof(dbug_explain_buf)];
190
 
  if (DBUG_EXPLAIN(buffer, sizeof(buffer)))
191
 
    sql_print_error("DBUG_EXPLAIN buffer too small.\n");
192
 
  DBUG_POP();
193
 
  DBUG_PUSH(dbug_explain_buf);
194
184
  memcpy(dbug_explain_buf, buffer, sizeof(buffer));
195
185
}
196
 
#endif
197
186
 
198
187
/**
199
188
  Create all threads for the thread pool
211
200
static bool libevent_init(void)
212
201
{
213
202
  uint i;
214
 
  DBUG_ENTER("libevent_init");
215
203
 
216
204
  event_init();
217
205
  
218
206
  created_threads= 0;
219
207
  killed_threads= 0;
220
 
  kill_pool_threads= FALSE;
 
208
  kill_pool_threads= false;
221
209
  
222
210
  pthread_mutex_init(&LOCK_event_loop, NULL);
223
211
  pthread_mutex_init(&LOCK_thd_add, NULL);
226
214
  if (init_pipe(thd_add_pipe))
227
215
  {
228
216
    sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
229
 
    DBUG_RETURN(1);
 
217
    return(1);
230
218
  }
231
219
  /* set up the pipe used to kill thds in the event queue */
232
220
  if (init_pipe(thd_kill_pipe))
234
222
    sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
235
223
    close(thd_add_pipe[0]);
236
224
    close(thd_add_pipe[1]);
237
 
    DBUG_RETURN(1);
 
225
    return(1);
238
226
  }
239
227
  event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
240
228
            libevent_add_thd_callback, NULL);
245
233
 {
246
234
   sql_print_error("thd_add_event event_add error in libevent_init\n");
247
235
   libevent_end();
248
 
   DBUG_RETURN(1);
 
236
   return(1);
249
237
   
250
238
 }
251
239
  /* Set up the thread pool */
263
251
                      error);
264
252
      pthread_mutex_unlock(&LOCK_thread_count);
265
253
      libevent_end();                      // Cleanup
266
 
      DBUG_RETURN(TRUE);
 
254
      return(true);
267
255
    }
268
256
  }
269
257
 
272
260
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
273
261
  pthread_mutex_unlock(&LOCK_thread_count);
274
262
  
275
 
  DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size));
276
 
  DBUG_RETURN(FALSE);
 
263
  return(false);
277
264
}
278
265
 
279
266
 
397
384
 
398
385
static void libevent_add_connection(THD *thd)
399
386
{
400
 
  DBUG_ENTER("libevent_add_connection");
401
 
  DBUG_PRINT("enter", ("thd: 0x%lx  thread_id: %lu",
402
 
                       (long) thd, thd->thread_id));
403
 
  
404
387
  if (thd->scheduler.init(thd))
405
388
  {
406
389
    sql_print_error("Scheduler init error in libevent_add_new_connection\n");
407
390
    pthread_mutex_unlock(&LOCK_thread_count);
408
391
    libevent_connection_close(thd);
409
 
    DBUG_VOID_RETURN;
 
392
    return;
410
393
  }
411
394
  threads.append(thd);
412
395
  libevent_thd_add(thd);
413
396
  
414
397
  pthread_mutex_unlock(&LOCK_thread_count);
415
 
  DBUG_VOID_RETURN;
 
398
  return;
416
399
}
417
400
 
418
401
 
448
431
 
449
432
static void libevent_connection_close(THD *thd)
450
433
{
451
 
  DBUG_ENTER("libevent_connection_close");
452
 
  DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
453
 
 
454
434
  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
455
435
 
456
436
  if (thd->net.vio->sd >= 0)                  // not already closed
462
442
  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
463
443
  pthread_mutex_unlock(&LOCK_thread_count);
464
444
 
465
 
  DBUG_VOID_RETURN;
 
445
  return;
466
446
}
467
447
 
468
448
 
483
463
  These procs only return/terminate on shutdown (kill_pool_threads == true).
484
464
*/
485
465
 
486
 
pthread_handler_t libevent_thread_proc(void *arg)
 
466
pthread_handler_t libevent_thread_proc(void *arg __attribute__((__unused__)))
487
467
{
488
468
  if (init_new_connection_handler_thread())
489
469
  {
491
471
    sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
492
472
    exit(1);
493
473
  }
494
 
  DBUG_ENTER("libevent_thread_proc");
495
474
 
496
475
  /*
497
476
    Signal libevent_init() when all threads has been created and are ready to
541
520
    /* is the connection logged in yet? */
542
521
    if (!thd->scheduler.logged_in)
543
522
    {
544
 
      DBUG_PRINT("info", ("init new connection.  sd: %d",
545
 
                          thd->net.vio->sd));
546
523
      if (login_connection(thd))
547
524
      {
548
525
        /* Failed to log in */
552
529
      else
553
530
      {
554
531
        /* login successful */
555
 
        thd->scheduler.logged_in= TRUE;
 
532
        thd->scheduler.logged_in= true;
556
533
        prepare_new_connection_state(thd);
557
534
        if (!libevent_needs_immediate_processing(thd))
558
535
          continue; /* New connection is now waiting for data in libevent*/
571
548
  }
572
549
  
573
550
thread_exit:
574
 
  DBUG_PRINT("exit", ("ending thread"));
575
551
  (void) pthread_mutex_lock(&LOCK_thread_count);
576
552
  killed_threads++;
577
553
  pthread_cond_broadcast(&COND_thread_count);
578
554
  (void) pthread_mutex_unlock(&LOCK_thread_count);
579
555
  my_thread_end();
580
556
  pthread_exit(0);
581
 
  DBUG_RETURN(0);                               /* purify: deadcode */
 
557
  return(0);                               /* purify: deadcode */
582
558
}
583
559
 
584
560
 
585
561
/*
586
 
  Returns TRUE if the connection needs immediate processing and FALSE if 
 
562
  Returns true if the connection needs immediate processing and false if 
587
563
  instead it's queued for libevent processing or closed,
588
564
*/
589
565
 
592
568
  if (libevent_should_close_connection(thd))
593
569
  {
594
570
    libevent_connection_close(thd);
595
 
    return FALSE;
 
571
    return false;
596
572
  }
597
573
  /*
598
 
    If more data in the socket buffer, return TRUE to process another command.
 
574
    If more data in the socket buffer, return true to process another command.
599
575
 
600
576
    Note: we cannot add for event processing because the whole request might
601
577
    already be buffered and we wouldn't receive an event.
602
578
  */
603
579
  if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
604
 
    return TRUE;
 
580
    return true;
605
581
  
606
582
  thd->scheduler.thread_detach();
607
583
  libevent_thd_add(thd);
608
 
  return FALSE;
 
584
  return false;
609
585
}
610
586
 
611
587
 
636
612
 
637
613
static void libevent_end()
638
614
{
639
 
  DBUG_ENTER("libevent_end");
640
 
  DBUG_PRINT("enter", ("created_threads: %d  killed_threads: %u",
641
 
                       created_threads, killed_threads));
642
 
  
643
 
  
644
615
  (void) pthread_mutex_lock(&LOCK_thread_count);
645
616
  
646
 
  kill_pool_threads= TRUE;
 
617
  kill_pool_threads= true;
647
618
  while (killed_threads != created_threads)
648
619
  {
649
620
    /* wake up the event loop */
663
634
 
664
635
  (void) pthread_mutex_destroy(&LOCK_event_loop);
665
636
  (void) pthread_mutex_destroy(&LOCK_thd_add);
666
 
  DBUG_VOID_RETURN;
 
637
  return;
667
638
}
668
639
 
669
640