~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to sql/scheduler.cc

  • Committer: Monty Taylor
  • Date: 2008-07-02 14:35:48 UTC
  • mto: This revision was merged to the branch mainline in revision 51.
  • Revision ID: monty@inaugust.com-20080702143548-onj30ry0sugr01uw
Removed all references to THREAD.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
  Implementation for the thread scheduler
18
18
*/
19
19
 
20
 
#include <drizzled/server_includes.h>
21
 
#include <libdrizzle/libdrizzle.h>
 
20
#ifdef USE_PRAGMA_INTERFACE
 
21
#pragma implementation
 
22
#endif
 
23
 
 
24
#include <mysql_priv.h>
22
25
#include "event.h"
23
26
 
24
27
 
28
31
 */
29
32
 
30
33
static bool init_dummy(void) {return 0;}
31
 
static void post_kill_dummy(THD *thd __attribute__((unused))) {}
 
34
static void post_kill_dummy(THD *thd) {}  
32
35
static void end_dummy(void) {}
33
 
static bool end_thread_dummy(THD *thd __attribute__((unused)),
34
 
                             bool cache_thread __attribute__((unused)))
35
 
{ return 0; }
 
36
static bool end_thread_dummy(THD *thd, bool cache_thread) { return 0; }
36
37
 
37
38
/*
38
39
  Initialize default scheduler with dummy functions so that setup functions
47
48
   end_thread(end_thread_dummy), end(end_dummy)
48
49
{}
49
50
 
50
 
static uint32_t created_threads, killed_threads;
 
51
static uint created_threads, killed_threads;
51
52
static bool kill_pool_threads;
52
53
 
53
54
static struct event thd_add_event;
80
81
 
81
82
/*
82
83
  Create a pipe and set to non-blocking.
83
 
  Returns true if there is an error.
 
84
  Returns TRUE if there is an error.
84
85
*/
85
86
 
86
87
static bool init_pipe(int pipe_fds[])
100
101
*/
101
102
 
102
103
thd_scheduler::thd_scheduler()
103
 
  : logged_in(false), io_event(NULL), thread_attached(false)
104
 
{
 
104
  : logged_in(FALSE), io_event(NULL), thread_attached(FALSE)
 
105
{  
 
106
#ifndef DBUG_OFF
105
107
  dbug_explain_buf[0]= 0;
 
108
#endif
106
109
}
107
110
 
108
111
 
109
112
thd_scheduler::~thd_scheduler()
110
113
{
111
 
  free(io_event);
 
114
  my_free(io_event, MYF(MY_ALLOW_ZERO_PTR));
112
115
}
113
116
 
114
117
 
115
 
thd_scheduler::thd_scheduler(const thd_scheduler&)
116
 
  : logged_in(false), io_event(NULL), thread_attached(false)
117
 
{}
118
 
 
119
 
void thd_scheduler::operator=(const thd_scheduler&)
120
 
{}
121
 
 
122
118
bool thd_scheduler::init(THD *parent_thd)
123
119
{
124
120
  io_event=
126
122
    
127
123
  if (!io_event)
128
124
  {
129
 
    sql_print_error(_("Memory allocation error in thd_scheduler::init\n"));
130
 
    return true;
 
125
    sql_print_error("Memory allocation error in thd_scheduler::init\n");
 
126
    return TRUE;
131
127
  }
132
128
  
133
 
  event_set(io_event, net_get_sd(&(parent_thd->net)), EV_READ, 
 
129
  event_set(io_event, parent_thd->net.vio->sd, EV_READ, 
134
130
            libevent_io_callback, (void*)parent_thd);
135
131
    
136
132
  list.data= parent_thd;
137
133
  
138
 
  return false;
 
134
  return FALSE;
139
135
}
140
136
 
141
137
 
145
141
 
146
142
bool thd_scheduler::thread_attach()
147
143
{
148
 
  assert(!thread_attached);
 
144
  DBUG_ASSERT(!thread_attached);
149
145
  THD* thd = (THD*)list.data;
150
146
  if (libevent_should_close_connection(thd) ||
151
147
      setup_connection_thread_globals(thd))
152
148
  {
153
 
    return true;
 
149
    return TRUE;
154
150
  }
155
151
  my_errno= 0;
156
152
  thd->mysys_var->abort= 0;
157
 
  thread_attached= true;
 
153
  thread_attached= TRUE;
 
154
#ifndef DBUG_OFF
158
155
  swap_dbug_explain();
159
 
  return false;
 
156
#endif
 
157
  return FALSE;
160
158
}
161
159
 
162
160
 
170
168
  {
171
169
    THD* thd = (THD*)list.data;
172
170
    thd->mysys_var= NULL;
173
 
    thread_attached= false;
 
171
    thread_attached= FALSE;
 
172
#ifndef DBUG_OFF
174
173
    swap_dbug_explain();
 
174
#endif
175
175
  }
176
176
}
177
177
 
182
182
  This is used to preserve the SESSION DEBUG variable, which is mapped to the OS 
183
183
  thread during a command, but each command is handled by a different thread.
184
184
*/
 
185
 
 
186
#ifndef DBUG_OFF
185
187
void thd_scheduler::swap_dbug_explain()
186
188
{
187
189
  char buffer[sizeof(dbug_explain_buf)];
 
190
  if (DBUG_EXPLAIN(buffer, sizeof(buffer)))
 
191
    sql_print_error("DBUG_EXPLAIN buffer too small.\n");
 
192
  DBUG_POP();
 
193
  DBUG_PUSH(dbug_explain_buf);
188
194
  memcpy(dbug_explain_buf, buffer, sizeof(buffer));
189
195
}
 
196
#endif
190
197
 
191
198
/**
192
199
  Create all threads for the thread pool
203
210
 
204
211
static bool libevent_init(void)
205
212
{
206
 
  uint32_t i;
 
213
  uint i;
 
214
  DBUG_ENTER("libevent_init");
207
215
 
208
216
  event_init();
209
217
  
210
218
  created_threads= 0;
211
219
  killed_threads= 0;
212
 
  kill_pool_threads= false;
 
220
  kill_pool_threads= FALSE;
213
221
  
214
222
  pthread_mutex_init(&LOCK_event_loop, NULL);
215
223
  pthread_mutex_init(&LOCK_thd_add, NULL);
217
225
  /* set up the pipe used to add new thds to the event pool */
218
226
  if (init_pipe(thd_add_pipe))
219
227
  {
220
 
    sql_print_error(_("init_pipe(thd_add_pipe) error in libevent_init\n"));
221
 
    return(1);
 
228
    sql_print_error("init_pipe(thd_add_pipe) error in libevent_init\n");
 
229
    DBUG_RETURN(1);
222
230
  }
223
231
  /* set up the pipe used to kill thds in the event queue */
224
232
  if (init_pipe(thd_kill_pipe))
225
233
  {
226
 
    sql_print_error(_("init_pipe(thd_kill_pipe) error in libevent_init\n"));
 
234
    sql_print_error("init_pipe(thd_kill_pipe) error in libevent_init\n");
227
235
    close(thd_add_pipe[0]);
228
236
    close(thd_add_pipe[1]);
229
 
    return(1);
 
237
    DBUG_RETURN(1);
230
238
  }
231
239
  event_set(&thd_add_event, thd_add_pipe[0], EV_READ|EV_PERSIST,
232
240
            libevent_add_thd_callback, NULL);
235
243
 
236
244
 if (event_add(&thd_add_event, NULL) || event_add(&thd_kill_event, NULL))
237
245
 {
238
 
   sql_print_error(_("thd_add_event event_add error in libevent_init\n"));
 
246
   sql_print_error("thd_add_event event_add error in libevent_init\n");
239
247
   libevent_end();
240
 
   return(1);
 
248
   DBUG_RETURN(1);
241
249
   
242
250
 }
243
251
  /* Set up the thread pool */
251
259
    if ((error= pthread_create(&thread, &connection_attrib,
252
260
                               libevent_thread_proc, 0)))
253
261
    {
254
 
      sql_print_error(_("Can't create completion port thread (error %d)"),
 
262
      sql_print_error("Can't create completion port thread (error %d)",
255
263
                      error);
256
264
      pthread_mutex_unlock(&LOCK_thread_count);
257
265
      libevent_end();                      // Cleanup
258
 
      return(true);
 
266
      DBUG_RETURN(TRUE);
259
267
    }
260
268
  }
261
269
 
264
272
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
265
273
  pthread_mutex_unlock(&LOCK_thread_count);
266
274
  
267
 
  return(false);
 
275
  DBUG_PRINT("info", ("%u threads created", (uint) thread_pool_size));
 
276
  DBUG_RETURN(FALSE);
268
277
}
269
278
 
270
279
 
364
373
      /* Add to libevent */
365
374
      if (event_add(thd->scheduler.io_event, NULL))
366
375
      {
367
 
        sql_print_error(_("event_add error in libevent_add_thd_callback\n"));
 
376
        sql_print_error("event_add error in libevent_add_thd_callback\n");
368
377
        libevent_connection_close(thd);
369
378
      } 
370
379
      else
388
397
 
389
398
static void libevent_add_connection(THD *thd)
390
399
{
 
400
  DBUG_ENTER("libevent_add_connection");
 
401
  DBUG_PRINT("enter", ("thd: 0x%lx  thread_id: %lu",
 
402
                       (long) thd, thd->thread_id));
 
403
  
391
404
  if (thd->scheduler.init(thd))
392
405
  {
393
 
    sql_print_error(_("Scheduler init error in libevent_add_new_connection\n"));
 
406
    sql_print_error("Scheduler init error in libevent_add_new_connection\n");
394
407
    pthread_mutex_unlock(&LOCK_thread_count);
395
408
    libevent_connection_close(thd);
396
 
    return;
 
409
    DBUG_VOID_RETURN;
397
410
  }
398
411
  threads.append(thd);
399
412
  libevent_thd_add(thd);
400
413
  
401
414
  pthread_mutex_unlock(&LOCK_thread_count);
402
 
  return;
 
415
  DBUG_VOID_RETURN;
403
416
}
404
417
 
405
418
 
435
448
 
436
449
static void libevent_connection_close(THD *thd)
437
450
{
 
451
  DBUG_ENTER("libevent_connection_close");
 
452
  DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
 
453
 
438
454
  thd->killed= THD::KILL_CONNECTION;          // Avoid error messages
439
455
 
440
 
  if (net_get_sd(&(thd->net)) >= 0)                  // not already closed
 
456
  if (thd->net.vio->sd >= 0)                  // not already closed
441
457
  {
442
458
    end_connection(thd);
443
459
    close_connection(thd, 0, 1);
446
462
  unlink_thd(thd);   /* locks LOCK_thread_count and deletes thd */
447
463
  pthread_mutex_unlock(&LOCK_thread_count);
448
464
 
449
 
  return;
 
465
  DBUG_VOID_RETURN;
450
466
}
451
467
 
452
468
 
456
472
 
457
473
static bool libevent_should_close_connection(THD* thd)
458
474
{
459
 
  return net_should_close(&(thd->net)) ||
 
475
  return thd->net.error ||
 
476
         thd->net.vio == 0 ||
460
477
         thd->killed == THD::KILL_CONNECTION;
461
478
}
462
479
 
466
483
  These procs only return/terminate on shutdown (kill_pool_threads == true).
467
484
*/
468
485
 
469
 
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
 
486
pthread_handler_t libevent_thread_proc(void *arg)
470
487
{
471
488
  if (init_new_connection_handler_thread())
472
489
  {
473
490
    my_thread_global_end();
474
 
    sql_print_error(_("libevent_thread_proc: my_thread_init() failed\n"));
 
491
    sql_print_error("libevent_thread_proc: my_thread_init() failed\n");
475
492
    exit(1);
476
493
  }
 
494
  DBUG_ENTER("libevent_thread_proc");
477
495
 
478
496
  /*
479
497
    Signal libevent_init() when all threads has been created and are ready to
523
541
    /* is the connection logged in yet? */
524
542
    if (!thd->scheduler.logged_in)
525
543
    {
 
544
      DBUG_PRINT("info", ("init new connection.  sd: %d",
 
545
                          thd->net.vio->sd));
526
546
      if (login_connection(thd))
527
547
      {
528
548
        /* Failed to log in */
532
552
      else
533
553
      {
534
554
        /* login successful */
535
 
        thd->scheduler.logged_in= true;
 
555
        thd->scheduler.logged_in= TRUE;
536
556
        prepare_new_connection_state(thd);
537
557
        if (!libevent_needs_immediate_processing(thd))
538
558
          continue; /* New connection is now waiting for data in libevent*/
551
571
  }
552
572
  
553
573
thread_exit:
 
574
  DBUG_PRINT("exit", ("ending thread"));
554
575
  (void) pthread_mutex_lock(&LOCK_thread_count);
555
576
  killed_threads++;
556
577
  pthread_cond_broadcast(&COND_thread_count);
557
578
  (void) pthread_mutex_unlock(&LOCK_thread_count);
558
579
  my_thread_end();
559
580
  pthread_exit(0);
560
 
  return(0);                               /* purify: deadcode */
 
581
  DBUG_RETURN(0);                               /* purify: deadcode */
561
582
}
562
583
 
563
584
 
564
585
/*
565
 
  Returns true if the connection needs immediate processing and false if 
 
586
  Returns TRUE if the connection needs immediate processing and FALSE if 
566
587
  instead it's queued for libevent processing or closed,
567
588
*/
568
589
 
571
592
  if (libevent_should_close_connection(thd))
572
593
  {
573
594
    libevent_connection_close(thd);
574
 
    return false;
 
595
    return FALSE;
575
596
  }
576
597
  /*
577
 
    If more data in the socket buffer, return true to process another command.
 
598
    If more data in the socket buffer, return TRUE to process another command.
578
599
 
579
600
    Note: we cannot add for event processing because the whole request might
580
601
    already be buffered and we wouldn't receive an event.
581
602
  */
582
 
  if (net_more_data(&(thd->net)))
583
 
    return true;
 
603
  if (thd->net.vio == 0 || thd->net.vio->read_pos < thd->net.vio->read_end)
 
604
    return TRUE;
584
605
  
585
606
  thd->scheduler.thread_detach();
586
607
  libevent_thd_add(thd);
587
 
  return false;
 
608
  return FALSE;
588
609
}
589
610
 
590
611
 
615
636
 
616
637
static void libevent_end()
617
638
{
 
639
  DBUG_ENTER("libevent_end");
 
640
  DBUG_PRINT("enter", ("created_threads: %d  killed_threads: %u",
 
641
                       created_threads, killed_threads));
 
642
  
 
643
  
618
644
  (void) pthread_mutex_lock(&LOCK_thread_count);
619
645
  
620
 
  kill_pool_threads= true;
 
646
  kill_pool_threads= TRUE;
621
647
  while (killed_threads != created_threads)
622
648
  {
623
649
    /* wake up the event loop */
637
663
 
638
664
  (void) pthread_mutex_destroy(&LOCK_event_loop);
639
665
  (void) pthread_mutex_destroy(&LOCK_thd_add);
640
 
  return;
 
666
  DBUG_VOID_RETURN;
641
667
}
642
668
 
643
669