~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to sql/scheduler.cc

  • Committer: Stewart Smith
  • Date: 2008-06-30 06:46:40 UTC
  • mto: This revision was merged to the branch mainline in revision 19.
  • Revision ID: stewart@flamingspork.com-20080630064640-1tbyi1e8j4duba45
no embedded server, stop testing for it in tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
  Implementation for the thread scheduler
18
18
*/
19
19
 
20
 
#include <drizzled/server_includes.h>
21
 
#include <libdrizzle/libdrizzle.h>
 
20
#ifdef USE_PRAGMA_INTERFACE
 
21
#pragma implementation
 
22
#endif
 
23
 
 
24
#include <mysql_priv.h>
22
25
#include "event.h"
23
26
 
24
27
 
28
31
 */
29
32
 
30
33
static bool init_dummy(void) {return 0;}
31
 
static void post_kill_dummy(THD *thd __attribute__((unused))) {}
 
34
static void post_kill_dummy(THD *thd) {}  
32
35
static void end_dummy(void) {}
33
 
static bool end_thread_dummy(THD *thd __attribute__((unused)),
34
 
                             bool cache_thread __attribute__((unused)))
35
 
{ return 0; }
 
36
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; }
36
37
 
37
38
/*
38
39
  Initialize default scheduler with dummy functions so that setup functions
47
48
   end_thread(end_thread_dummy), end(end_dummy)
48
49
{}
49
50
 
50
 
static uint32_t created_threads, killed_threads;
 
51
static uint created_threads, killed_threads;
51
52
static bool kill_pool_threads;
52
53
 
53
54
static struct event thd_add_event;
80
81
 
81
82
/*
82
83
  Create a pipe and set to non-blocking.
83
 
  Returns true if there is an error.
 
84
  Returns TRUE if there is an error.
84
85
*/
85
86
 
86
87
static bool init_pipe(int pipe_fds[])
100
101
*/
101
102
 
102
103
thd_scheduler::thd_scheduler()
103
 
  : logged_in(false), io_event(NULL), thread_attached(false)
 
104
  : logged_in(FALSE), io_event(NULL), thread_attached(FALSE)
104
105
{  
 
106
#ifndef DBUG_OFF
105
107
  dbug_explain_buf[0]= 0;
 
108
#endif
106
109
}
107
110
 
108
111
 
109
112
thd_scheduler::~thd_scheduler()
110
113
{
111
 
  free(io_event);
 
114
  my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
112
115
}
113
116
 
114
117
 
119
122
    
120
123
  if (!io_event)
121
124
  {
122
 
    sql_print_error(_("Memory allocation error in thd_scheduler::init\n"));
123
 
    return true;
 
125
    sql_print_error("Memory allocation error in thd_scheduler::init\n");
 
126
    return TRUE;
124
127
  }
125
128
  
126
 
  event_set(io_event, net_get_sd(&(parent_thd->net)), EV_READ, 
 
129
  event_set(io_event, parent_thd->net.vio->sd, EV_READ, 
127
130
            libevent_io_callback, (void*)parent_thd);
128
131
    
129
132
  list.data= parent_thd;
130
133
  
131
 
  return false;
 
134
  return FALSE;
132
135
}
133
136
 
134
137
 
138
141
 
139
142
bool thd_scheduler::thread_attach()
140
143
{
141
 
  assert(!thread_attached);
 
144
  DBUG_ASSERT(!thread_attached);
142
145
  THD* thd = (THD*)list.data;
143
146
  if (libevent_should_close_connection(thd) ||
144
147
      setup_connection_thread_globals(thd))
145
148
  {
146
 
    return true;
 
149
    return TRUE;
147
150
  }
148
151
  my_errno= 0;
149
152
  thd->mysys_var->abort= 0;
150
 
  thread_attached= true;
 
153
  thread_attached= TRUE;
 
154
#ifndef DBUG_OFF
151
155
  swap_dbug_explain();
152
 
  return false;
 
156
#endif
 
157
  return FALSE;
153
158
}
154
159
 
155
160
 
163
168
  {
164
169
    THD* thd = (THD*)list.data;
165
170
    thd->mysys_var= NULL;
166
 
    thread_attached= false;
 
171
    thread_attached= FALSE;
 
172
#ifndef DBUG_OFF
167
173
    swap_dbug_explain();
 
174
#endif
168
175
  }
169
176
}
170
177
 
175
182
  This is used to preserve the SESSION DEBUG variable, which is mapped to the OS 
176
183
  thread during a command, but each command is handled by a different thread.
177
184
*/
 
185
 
 
186
#ifndef DBUG_OFF
178
187
void thd_scheduler::swap_dbug_explain()
179
188
{
180
189
  char buffer[sizeof(dbug_explain_buf)];
 
190
  if (DBUG_EXPLAIN(buffer, sizeof(buffer)))
 
191
    sql_print_error("DBUG_EXPLAIN buffer too small.\n");
 
192
  DBUG_POP();
 
193
  DBUG_PUSH(dbug_explain_buf);
181
194
  memcpy(dbug_explain_buf, buffer, sizeof(buffer));
182
195
}
 
196
#endif
183
197
 
184
198
/**
185
199
  Create all threads for the thread pool
196
210
 
197
211
static bool libevent_init(void)
198
212
{
199
 
  uint32_t i;
 
213
  uint i;
 
214
  DBUG_ENTER("libevent_init");
200
215
 
201
216
  event_init();
202
217
  
203
218
  created_threads= 0;
204
219
  killed_threads= 0;
205
 
  kill_pool_threads= false;
 
220
  kill_pool_threads= FALSE;
206
221
  
207
222
  pthread_mutex_init(&LOCK_event_loop, NULL);
208
223
  pthread_mutex_init(&LOCK_thd_add, NULL);
210
225
  /* set up the pipe used to add new thds to the event pool */
211
226
  if (init_pipe(thd_add_pipe))
212
227
  {
213
 
    sql_print_error(_("init_pipe(thd_add_pipe) error in libevent_init\n"));
214
 
    return(1);
 
228
    sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
 
229
    DBUG_RETURN(1);
215
230
  }
216
231
  /* set up the pipe used to kill thds in the event queue */
217
232
  if (init_pipe(thd_kill_pipe))
218
233
  {
219
 
    sql_print_error(_("init_pipe(thd_kill_pipe) error in libevent_init\n"));
 
234
    sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
220
235
    close(thd_add_pipe[0]);
221
236
    close(thd_add_pipe[1]);
222
 
    return(1);
 
237
    DBUG_RETURN(1);
223
238
  }
224
239
  event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
225
240
            libevent_add_thd_callback, NULL);
228
243
 
229
244
 if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
230
245
 {
231
 
   sql_print_error(_("thd_add_event event_add error in libevent_init\n"));
 
246
   sql_print_error("thd_add_event event_add error in libevent_init\n");
232
247
   libevent_end();
233
 
   return(1);
 
248
   DBUG_RETURN(1);
234
249
   
235
250
 }
236
251
  /* Set up the thread pool */
244
259
    if ((error= pthread_create(&thread, &connection_attrib,
245
260
                               libevent_thread_proc, 0)))
246
261
    {
247
 
      sql_print_error(_("Can't create completion port thread (error %d)"),
 
262
      sql_print_error("Can't create completion port thread (error %d)",
248
263
                      error);
249
264
      pthread_mutex_unlock(&LOCK_thread_count);
250
265
      libevent_end();                      // Cleanup
251
 
      return(true);
 
266
      DBUG_RETURN(TRUE);
252
267
    }
253
268
  }
254
269
 
257
272
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
258
273
  pthread_mutex_unlock(&LOCK_thread_count);
259
274
  
260
 
  return(false);
 
275
  DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size));
 
276
  DBUG_RETURN(FALSE);
261
277
}
262
278
 
263
279
 
357
373
      /* Add to libevent */
358
374
      if (event_add(thd->scheduler.io_event, NULL))
359
375
      {
360
 
        sql_print_error(_("event_add error in libevent_add_thd_callback\n"));
 
376
        sql_print_error("event_add error in libevent_add_thd_callback\n");
361
377
        libevent_connection_close(thd);
362
378
      } 
363
379
      else
381
397
 
382
398
static void libevent_add_connection(THD *thd)
383
399
{
 
400
  DBUG_ENTER("libevent_add_connection");
 
401
  DBUG_PRINT("enter", ("thd: 0x%lx  thread_id: %lu",
 
402
                       (long) thd, thd->thread_id));
 
403
  
384
404
  if (thd->scheduler.init(thd))
385
405
  {
386
 
    sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
 
406
    sql_print_error("Scheduler init error in libevent_add_new_connection\n");
387
407
    pthread_mutex_unlock(&LOCK_thread_count);
388
408
    libevent_connection_close(thd);
389
 
    return;
 
409
    DBUG_VOID_RETURN;
390
410
  }
391
411
  threads.append(thd);
392
412
  libevent_thd_add(thd);
393
413
  
394
414
  pthread_mutex_unlock(&LOCK_thread_count);
395
 
  return;
 
415
  DBUG_VOID_RETURN;
396
416
}
397
417
 
398
418
 
428
448
 
429
449
static void libevent_connection_close(THD *thd)
430
450
{
 
451
  DBUG_ENTER("libevent_connection_close");
 
452
  DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
 
453
 
431
454
  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
432
455
 
433
 
  if (net_get_sd(&(thd->net)) >= 0)                  // not already closed
 
456
  if (thd->net.vio->sd >= 0)                  // not already closed
434
457
  {
435
458
    end_connection(thd);
436
459
    close_connection(thd, 0, 1);
439
462
  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
440
463
  pthread_mutex_unlock(&LOCK_thread_count);
441
464
 
442
 
  return;
 
465
  DBUG_VOID_RETURN;
443
466
}
444
467
 
445
468
 
449
472
 
450
473
static bool libevent_should_close_connection(THD* thd)
451
474
{
452
 
  return net_should_close(&(thd->net)) ||
 
475
  return thd->net.error ||
 
476
         thd->net.vio == 0 ||
453
477
         thd->killed == THD::KILL_CONNECTION;
454
478
}
455
479
 
459
483
  These procs only return/terminate on shutdown (kill_pool_threads == true).
460
484
*/
461
485
 
462
 
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
 
486
pthread_handler_t libevent_thread_proc(void *arg)
463
487
{
464
488
  if (init_new_connection_handler_thread())
465
489
  {
466
490
    my_thread_global_end();
467
 
    sql_print_error(_("libevent_thread_proc: my_thread_init() failed\n"));
 
491
    sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
468
492
    exit(1);
469
493
  }
 
494
  DBUG_ENTER("libevent_thread_proc");
470
495
 
471
496
  /*
472
497
    Signal libevent_init() when all threads has been created and are ready to
516
541
    /* is the connection logged in yet? */
517
542
    if (!thd->scheduler.logged_in)
518
543
    {
 
544
      DBUG_PRINT("info", ("init new connection.  sd: %d",
 
545
                          thd->net.vio->sd));
519
546
      if (login_connection(thd))
520
547
      {
521
548
        /* Failed to log in */
525
552
      else
526
553
      {
527
554
        /* login successful */
528
 
        thd->scheduler.logged_in= true;
 
555
        thd->scheduler.logged_in= TRUE;
529
556
        prepare_new_connection_state(thd);
530
557
        if (!libevent_needs_immediate_processing(thd))
531
558
          continue; /* New connection is now waiting for data in libevent*/
544
571
  }
545
572
  
546
573
thread_exit:
 
574
  DBUG_PRINT("exit", ("ending thread"));
547
575
  (void) pthread_mutex_lock(&LOCK_thread_count);
548
576
  killed_threads++;
549
577
  pthread_cond_broadcast(&COND_thread_count);
550
578
  (void) pthread_mutex_unlock(&LOCK_thread_count);
551
579
  my_thread_end();
552
580
  pthread_exit(0);
553
 
  return(0);                               /* purify: deadcode */
 
581
  DBUG_RETURN(0);                               /* purify: deadcode */
554
582
}
555
583
 
556
584
 
557
585
/*
558
 
  Returns true if the connection needs immediate processing and false if 
 
586
  Returns TRUE if the connection needs immediate processing and FALSE if 
559
587
  instead it's queued for libevent processing or closed,
560
588
*/
561
589
 
564
592
  if (libevent_should_close_connection(thd))
565
593
  {
566
594
    libevent_connection_close(thd);
567
 
    return false;
 
595
    return FALSE;
568
596
  }
569
597
  /*
570
 
    If more data in the socket buffer, return true to process another command.
 
598
    If more data in the socket buffer, return TRUE to process another command.
571
599
 
572
600
    Note: we cannot add for event processing because the whole request might
573
601
    already be buffered and we wouldn't receive an event.
574
602
  */
575
 
  if (net_more_data(&(thd->net)))
576
 
    return true;
 
603
  if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
 
604
    return TRUE;
577
605
  
578
606
  thd->scheduler.thread_detach();
579
607
  libevent_thd_add(thd);
580
 
  return false;
 
608
  return FALSE;
581
609
}
582
610
 
583
611
 
608
636
 
609
637
static void libevent_end()
610
638
{
 
639
  DBUG_ENTER("libevent_end");
 
640
  DBUG_PRINT("enter", ("created_threads: %d  killed_threads: %u",
 
641
                       created_threads, killed_threads));
 
642
  
 
643
  
611
644
  (void) pthread_mutex_lock(&LOCK_thread_count);
612
645
  
613
 
  kill_pool_threads= true;
 
646
  kill_pool_threads= TRUE;
614
647
  while (killed_threads != created_threads)
615
648
  {
616
649
    /* wake up the event loop */
630
663
 
631
664
  (void) pthread_mutex_destroy(&LOCK_event_loop);
632
665
  (void) pthread_mutex_destroy(&LOCK_thd_add);
633
 
  return;
 
666
  DBUG_VOID_RETURN;
634
667
}
635
668
 
636
669