~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to sql/scheduler.cc

Removed DBUG symbols and fixed TRUE/FALSE

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
  Implementation for the thread scheduler
18
18
*/
19
19
 
20
 
#include <drizzled/server_includes.h>
21
 
#include <libdrizzle/libdrizzle.h>
 
20
#ifdef USE_PRAGMA_INTERFACE
 
21
#pragma implementation
 
22
#endif
 
23
 
 
24
#include <mysql_priv.h>
22
25
#include "event.h"
23
26
 
24
27
 
28
31
 */
29
32
 
30
33
static bool init_dummy(void) {return 0;}
31
 
static void post_kill_dummy(THD *thd __attribute__((unused))) {}
 
34
static void post_kill_dummy(THD *thd) {}  
32
35
static void end_dummy(void) {}
33
 
static bool end_thread_dummy(THD *thd __attribute__((unused)),
34
 
                             bool cache_thread __attribute__((unused)))
35
 
{ return 0; }
 
36
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; }
36
37
 
37
38
/*
38
39
  Initialize default scheduler with dummy functions so that setup functions
47
48
   end_thread(end_thread_dummy), end(end_dummy)
48
49
{}
49
50
 
50
 
static uint32_t created_threads, killed_threads;
 
51
static uint created_threads, killed_threads;
51
52
static bool kill_pool_threads;
52
53
 
53
54
static struct event thd_add_event;
101
102
 
102
103
thd_scheduler::thd_scheduler()
103
104
  : logged_in(false), io_event(NULL), thread_attached(false)
104
 
{
 
105
{  
 
106
#ifndef DBUG_OFF
105
107
  dbug_explain_buf[0]= 0;
 
108
#endif
106
109
}
107
110
 
108
111
 
109
112
thd_scheduler::~thd_scheduler()
110
113
{
111
 
  free(io_event);
 
114
  my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
112
115
}
113
116
 
114
117
 
115
 
thd_scheduler::thd_scheduler(const thd_scheduler&)
116
 
  : logged_in(false), io_event(NULL), thread_attached(false)
117
 
{}
118
 
 
119
 
void thd_scheduler::operator=(const thd_scheduler&)
120
 
{}
121
 
 
122
118
bool thd_scheduler::init(THD *parent_thd)
123
119
{
124
120
  io_event=
126
122
    
127
123
  if (!io_event)
128
124
  {
129
 
    sql_print_error(_("Memory allocation error in thd_scheduler::init\n"));
 
125
    sql_print_error("Memory allocation error in thd_scheduler::init\n");
130
126
    return true;
131
127
  }
132
128
  
133
 
  event_set(io_event, net_get_sd(&(parent_thd->net)), EV_READ, 
 
129
  event_set(io_event, parent_thd->net.vio->sd, EV_READ, 
134
130
            libevent_io_callback, (void*)parent_thd);
135
131
    
136
132
  list.data= parent_thd;
145
141
 
146
142
bool thd_scheduler::thread_attach()
147
143
{
148
 
  assert(!thread_attached);
 
144
  DBUG_ASSERT(!thread_attached);
149
145
  THD* thd = (THD*)list.data;
150
146
  if (libevent_should_close_connection(thd) ||
151
147
      setup_connection_thread_globals(thd))
155
151
  my_errno= 0;
156
152
  thd->mysys_var->abort= 0;
157
153
  thread_attached= true;
 
154
#ifndef DBUG_OFF
158
155
  swap_dbug_explain();
 
156
#endif
159
157
  return false;
160
158
}
161
159
 
171
169
    THD* thd = (THD*)list.data;
172
170
    thd->mysys_var= NULL;
173
171
    thread_attached= false;
 
172
#ifndef DBUG_OFF
174
173
    swap_dbug_explain();
 
174
#endif
175
175
  }
176
176
}
177
177
 
182
182
  This is used to preserve the SESSION DEBUG variable, which is mapped to the OS 
183
183
  thread during a command, but each command is handled by a different thread.
184
184
*/
 
185
 
 
186
#ifndef DBUG_OFF
185
187
void thd_scheduler::swap_dbug_explain()
186
188
{
187
189
  char buffer[sizeof(dbug_explain_buf)];
 
190
  if (DBUG_EXPLAIN(buffer, sizeof(buffer)))
 
191
    sql_print_error("DBUG_EXPLAIN buffer too small.\n");
 
192
  DBUG_POP();
 
193
  DBUG_PUSH(dbug_explain_buf);
188
194
  memcpy(dbug_explain_buf, buffer, sizeof(buffer));
189
195
}
 
196
#endif
190
197
 
191
198
/**
192
199
  Create all threads for the thread pool
203
210
 
204
211
static bool libevent_init(void)
205
212
{
206
 
  uint32_t i;
 
213
  uint i;
 
214
  DBUG_ENTER("libevent_init");
207
215
 
208
216
  event_init();
209
217
  
217
225
  /* set up the pipe used to add new thds to the event pool */
218
226
  if (init_pipe(thd_add_pipe))
219
227
  {
220
 
    sql_print_error(_("init_pipe(thd_add_pipe) error in libevent_init\n"));
221
 
    return(1);
 
228
    sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
 
229
    DBUG_RETURN(1);
222
230
  }
223
231
  /* set up the pipe used to kill thds in the event queue */
224
232
  if (init_pipe(thd_kill_pipe))
225
233
  {
226
 
    sql_print_error(_("init_pipe(thd_kill_pipe) error in libevent_init\n"));
 
234
    sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
227
235
    close(thd_add_pipe[0]);
228
236
    close(thd_add_pipe[1]);
229
 
    return(1);
 
237
    DBUG_RETURN(1);
230
238
  }
231
239
  event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
232
240
            libevent_add_thd_callback, NULL);
235
243
 
236
244
 if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
237
245
 {
238
 
   sql_print_error(_("thd_add_event event_add error in libevent_init\n"));
 
246
   sql_print_error("thd_add_event event_add error in libevent_init\n");
239
247
   libevent_end();
240
 
   return(1);
 
248
   DBUG_RETURN(1);
241
249
   
242
250
 }
243
251
  /* Set up the thread pool */
251
259
    if ((error= pthread_create(&thread, &connection_attrib,
252
260
                               libevent_thread_proc, 0)))
253
261
    {
254
 
      sql_print_error(_("Can't create completion port thread (error %d)"),
 
262
      sql_print_error("Can't create completion port thread (error %d)",
255
263
                      error);
256
264
      pthread_mutex_unlock(&LOCK_thread_count);
257
265
      libevent_end();                      // Cleanup
258
 
      return(true);
 
266
      DBUG_RETURN(true);
259
267
    }
260
268
  }
261
269
 
264
272
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
265
273
  pthread_mutex_unlock(&LOCK_thread_count);
266
274
  
267
 
  return(false);
 
275
  DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size));
 
276
  DBUG_RETURN(false);
268
277
}
269
278
 
270
279
 
364
373
      /* Add to libevent */
365
374
      if (event_add(thd->scheduler.io_event, NULL))
366
375
      {
367
 
        sql_print_error(_("event_add error in libevent_add_thd_callback\n"));
 
376
        sql_print_error("event_add error in libevent_add_thd_callback\n");
368
377
        libevent_connection_close(thd);
369
378
      } 
370
379
      else
388
397
 
389
398
static void libevent_add_connection(THD *thd)
390
399
{
 
400
  DBUG_ENTER("libevent_add_connection");
 
401
  DBUG_PRINT("enter", ("thd: 0x%lx  thread_id: %lu",
 
402
                       (long) thd, thd->thread_id));
 
403
  
391
404
  if (thd->scheduler.init(thd))
392
405
  {
393
 
    sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
 
406
    sql_print_error("Scheduler init error in libevent_add_new_connection\n");
394
407
    pthread_mutex_unlock(&LOCK_thread_count);
395
408
    libevent_connection_close(thd);
396
 
    return;
 
409
    DBUG_VOID_RETURN;
397
410
  }
398
411
  threads.append(thd);
399
412
  libevent_thd_add(thd);
400
413
  
401
414
  pthread_mutex_unlock(&LOCK_thread_count);
402
 
  return;
 
415
  DBUG_VOID_RETURN;
403
416
}
404
417
 
405
418
 
435
448
 
436
449
static void libevent_connection_close(THD *thd)
437
450
{
 
451
  DBUG_ENTER("libevent_connection_close");
 
452
  DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
 
453
 
438
454
  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
439
455
 
440
 
  if (net_get_sd(&(thd->net)) >= 0)                  // not already closed
 
456
  if (thd->net.vio->sd >= 0)                  // not already closed
441
457
  {
442
458
    end_connection(thd);
443
459
    close_connection(thd, 0, 1);
446
462
  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
447
463
  pthread_mutex_unlock(&LOCK_thread_count);
448
464
 
449
 
  return;
 
465
  DBUG_VOID_RETURN;
450
466
}
451
467
 
452
468
 
456
472
 
457
473
static bool libevent_should_close_connection(THD* thd)
458
474
{
459
 
  return net_should_close(&(thd->net)) ||
 
475
  return thd->net.error ||
 
476
         thd->net.vio == 0 ||
460
477
         thd->killed == THD::KILL_CONNECTION;
461
478
}
462
479
 
466
483
  These procs only return/terminate on shutdown (kill_pool_threads == true).
467
484
*/
468
485
 
469
 
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
 
486
pthread_handler_t libevent_thread_proc(void *arg)
470
487
{
471
488
  if (init_new_connection_handler_thread())
472
489
  {
473
490
    my_thread_global_end();
474
 
    sql_print_error(_("libevent_thread_proc: my_thread_init() failed\n"));
 
491
    sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
475
492
    exit(1);
476
493
  }
 
494
  DBUG_ENTER("libevent_thread_proc");
477
495
 
478
496
  /*
479
497
    Signal libevent_init() when all threads has been created and are ready to
523
541
    /* is the connection logged in yet? */
524
542
    if (!thd->scheduler.logged_in)
525
543
    {
 
544
      DBUG_PRINT("info", ("init new connection.  sd: %d",
 
545
                          thd->net.vio->sd));
526
546
      if (login_connection(thd))
527
547
      {
528
548
        /* Failed to log in */
551
571
  }
552
572
  
553
573
thread_exit:
 
574
  DBUG_PRINT("exit", ("ending thread"));
554
575
  (void) pthread_mutex_lock(&LOCK_thread_count);
555
576
  killed_threads++;
556
577
  pthread_cond_broadcast(&COND_thread_count);
557
578
  (void) pthread_mutex_unlock(&LOCK_thread_count);
558
579
  my_thread_end();
559
580
  pthread_exit(0);
560
 
  return(0);                               /* purify: deadcode */
 
581
  DBUG_RETURN(0);                               /* purify: deadcode */
561
582
}
562
583
 
563
584
 
579
600
    Note: we cannot add for event processing because the whole request might
580
601
    already be buffered and we wouldn't receive an event.
581
602
  */
582
 
  if (net_more_data(&(thd->net)))
 
603
  if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
583
604
    return true;
584
605
  
585
606
  thd->scheduler.thread_detach();
615
636
 
616
637
static void libevent_end()
617
638
{
 
639
  DBUG_ENTER("libevent_end");
 
640
  DBUG_PRINT("enter", ("created_threads: %d  killed_threads: %u",
 
641
                       created_threads, killed_threads));
 
642
  
 
643
  
618
644
  (void) pthread_mutex_lock(&LOCK_thread_count);
619
645
  
620
646
  kill_pool_threads= true;
637
663
 
638
664
  (void) pthread_mutex_destroy(&LOCK_event_loop);
639
665
  (void) pthread_mutex_destroy(&LOCK_thd_add);
640
 
  return;
 
666
  DBUG_VOID_RETURN;
641
667
}
642
668
 
643
669