~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pool_of_threads/pool_of_threads.cc

  • Committer: Brian Aker
  • Date: 2009-01-24 04:31:39 UTC
  • Revision ID: brian@gir-3.local-20090124043139-5cu9wjefszrnyhe0
Refactored all current scheduler to be behind scheduler plugin api.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2006 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
15
 
 
16
#include <drizzled/server_includes.h>
 
17
#include <drizzled/gettext.h>
 
18
#include <drizzled/error.h>
 
19
#include <drizzled/plugin_scheduling.h>
 
20
#include <drizzled/serialize/serialize.h>
 
21
#include <drizzled/connect.h>
 
22
#include <drizzled/sql_parse.h>
 
23
#include <drizzled/session.h>
 
24
#include "session_scheduler.h"
 
25
#include <string>
 
26
#include <event.h>
 
27
using namespace std;
 
28
 
 
29
static uint32_t created_threads, killed_threads;
 
30
static bool kill_pool_threads;
 
31
 
 
32
static struct event session_add_event;
 
33
static struct event session_kill_event;
 
34
 
 
35
static pthread_mutex_t LOCK_session_add;    /* protects sessions_need_adding */
 
36
static LIST *sessions_need_adding= NULL;    /* list of sessions to add to libevent queue */
 
37
 
 
38
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
 
39
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
 
40
 
 
41
/*
 
42
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and
 
43
  event_del) and sessions_need_processing and sessions_waiting_for_io.
 
44
*/
 
45
static pthread_mutex_t LOCK_event_loop;
 
46
static LIST *sessions_need_processing; /* list of sessions that needs some processing */
 
47
static LIST *sessions_waiting_for_io; /* list of sessions with added events */
 
48
 
 
49
pthread_handler_t libevent_thread_proc(void *arg);
 
50
static void libevent_end();
 
51
static bool libevent_needs_immediate_processing(Session *session);
 
52
static void libevent_connection_close(Session *session);
 
53
void libevent_session_add(Session* session);
 
54
bool libevent_should_close_connection(Session* session);
 
55
void libevent_io_callback(int Fd, short Operation, void *ctx);
 
56
void libevent_add_session_callback(int Fd, short Operation, void *ctx);
 
57
void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
 
58
 
 
59
static uint32_t size= 0;
 
60
 
 
61
/*
 
62
  Create a pipe and set to non-blocking.
 
63
  Returns true if there is an error.
 
64
*/
 
65
 
 
66
static bool init_pipe(int pipe_fds[])
 
67
{
 
68
  int flags;
 
69
  return pipe(pipe_fds) < 0 ||
 
70
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
 
71
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1;
 
72
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
 
73
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
 
74
}
 
75
 
 
76
 
 
77
 
 
78
/**
 
79
  Create all threads for the thread pool
 
80
 
 
81
  NOTES
 
82
    After threads are created we wait until all threads has signaled that
 
83
    they have started before we return
 
84
 
 
85
  RETURN
 
86
    0  ok
 
87
    1  We got an error creating the thread pool
 
88
       In this case we will abort all created threads
 
89
*/
 
90
 
 
91
static bool libevent_init(void)
 
92
{
 
93
  uint32_t i;
 
94
 
 
95
  event_init();
 
96
 
 
97
  created_threads= 0;
 
98
  killed_threads= 0;
 
99
  kill_pool_threads= false;
 
100
 
 
101
  pthread_mutex_init(&LOCK_event_loop, NULL);
 
102
  pthread_mutex_init(&LOCK_session_add, NULL);
 
103
 
 
104
  /* set up the pipe used to add new sessions to the event pool */
 
105
  if (init_pipe(session_add_pipe))
 
106
  {
 
107
    errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_add_pipe) error in libevent_init\n"));
 
108
    return(1);
 
109
  }
 
110
  /* set up the pipe used to kill sessions in the event queue */
 
111
  if (init_pipe(session_kill_pipe))
 
112
  {
 
113
    errmsg_printf(ERRMSG_LVL_ERROR, _("init_pipe(session_kill_pipe) error in libevent_init\n"));
 
114
    close(session_add_pipe[0]);
 
115
    close(session_add_pipe[1]);
 
116
    return(1);
 
117
  }
 
118
  event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
 
119
            libevent_add_session_callback, NULL);
 
120
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
 
121
            libevent_kill_session_callback, NULL);
 
122
 
 
123
 if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
 
124
 {
 
125
   errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
 
126
   libevent_end();
 
127
   return(1);
 
128
 
 
129
 }
 
130
  /* Set up the thread pool */
 
131
  created_threads= killed_threads= 0;
 
132
  pthread_mutex_lock(&LOCK_thread_count);
 
133
 
 
134
  for (i= 0; i < thread_pool_size; i++)
 
135
  {
 
136
    pthread_t thread;
 
137
    int error;
 
138
    if ((error= pthread_create(&thread, &connection_attrib,
 
139
                               libevent_thread_proc, 0)))
 
140
    {
 
141
      errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
 
142
                      error);
 
143
      pthread_mutex_unlock(&LOCK_thread_count);
 
144
      libevent_end();                      // Cleanup
 
145
      return(true);
 
146
    }
 
147
  }
 
148
 
 
149
  /* Wait until all threads are created */
 
150
  while (created_threads != thread_pool_size)
 
151
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
 
152
  pthread_mutex_unlock(&LOCK_thread_count);
 
153
 
 
154
  return(false);
 
155
}
 
156
 
 
157
 
 
158
/*
 
159
  This is called when data is ready on the socket.
 
160
 
 
161
  NOTES
 
162
    This is only called by the thread that owns LOCK_event_loop.
 
163
 
 
164
    We add the session that got the data to sessions_need_processing, and
 
165
    cause the libevent event_loop() to terminate. Then this same thread will
 
166
    return from event_loop and pick the session value back up for processing.
 
167
*/
 
168
 
 
169
void libevent_io_callback(int, short, void *ctx)
 
170
{
 
171
  safe_mutex_assert_owner(&LOCK_event_loop);
 
172
  Session *session= (Session*)ctx;
 
173
  session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
174
  assert(scheduler);
 
175
  sessions_waiting_for_io= list_delete(sessions_waiting_for_io, &scheduler->list);
 
176
  sessions_need_processing= list_add(sessions_need_processing, &scheduler->list);
 
177
}
 
178
 
 
179
/*
 
180
  This is called when we have a thread we want to be killed.
 
181
 
 
182
  NOTES
 
183
    This is only called by the thread that owns LOCK_event_loop.
 
184
*/
 
185
 
 
186
void libevent_kill_session_callback(int Fd, short, void*)
 
187
{
 
188
  safe_mutex_assert_owner(&LOCK_event_loop);
 
189
 
 
190
  /* clear the pending events */
 
191
  char c;
 
192
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
193
  {}
 
194
 
 
195
  LIST* list= sessions_waiting_for_io;
 
196
  while (list)
 
197
  {
 
198
    Session *session= (Session*)list->data;
 
199
    list= list_rest(list);
 
200
    if (session->killed == Session::KILL_CONNECTION)
 
201
    {
 
202
      session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
203
      assert(scheduler);
 
204
      /*
 
205
        Delete from libevent and add to the processing queue.
 
206
      */
 
207
      event_del(scheduler->io_event);
 
208
      sessions_waiting_for_io= list_delete(sessions_waiting_for_io,
 
209
                                           &scheduler->list);
 
210
      sessions_need_processing= list_add(sessions_need_processing,
 
211
                                         &scheduler->list);
 
212
    }
 
213
  }
 
214
}
 
215
 
 
216
 
 
217
/*
 
218
  This is used to add connections to the pool. This callback is invoked from
 
219
  the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
 
220
  written to it.
 
221
 
 
222
  NOTES
 
223
    This is only called by the thread that owns LOCK_event_loop.
 
224
*/
 
225
 
 
226
void libevent_add_session_callback(int Fd, short, void *)
 
227
{
 
228
  safe_mutex_assert_owner(&LOCK_event_loop);
 
229
 
 
230
  /* clear the pending events */
 
231
  char c;
 
232
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
 
233
  {}
 
234
 
 
235
  pthread_mutex_lock(&LOCK_session_add);
 
236
  while (sessions_need_adding)
 
237
  {
 
238
    /* pop the first session off the list */
 
239
    Session* session= (Session*)sessions_need_adding->data;
 
240
    sessions_need_adding= list_delete(sessions_need_adding, sessions_need_adding);
 
241
    session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
242
    assert(scheduler);
 
243
 
 
244
    pthread_mutex_unlock(&LOCK_session_add);
 
245
 
 
246
    if (!scheduler->logged_in || libevent_should_close_connection(session))
 
247
    {
 
248
      /*
 
249
        Add session to sessions_need_processing list. If it needs closing we'll close
 
250
        it outside of event_loop().
 
251
      */
 
252
      sessions_need_processing= list_add(sessions_need_processing,
 
253
                                         &scheduler->list);
 
254
    }
 
255
    else
 
256
    {
 
257
      /* Add to libevent */
 
258
      if (event_add(scheduler->io_event, NULL))
 
259
      {
 
260
        errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
 
261
        libevent_connection_close(session);
 
262
      }
 
263
      else
 
264
      {
 
265
        sessions_waiting_for_io= list_add(sessions_waiting_for_io,
 
266
                                          &scheduler->list);
 
267
      }
 
268
    }
 
269
    pthread_mutex_lock(&LOCK_session_add);
 
270
  }
 
271
  pthread_mutex_unlock(&LOCK_session_add);
 
272
}
 
273
 
 
274
 
 
275
/**
 
276
  Notify the thread pool about a new connection
 
277
 
 
278
  NOTES
 
279
    LOCK_thread_count is locked on entry. This function MUST unlock it!
 
280
*/
 
281
 
 
282
static void libevent_add_connection(Session *session)
 
283
{
 
284
  assert(session->scheduler == NULL);
 
285
  session_scheduler *scheduler= new session_scheduler;
 
286
 
 
287
  session->scheduler= (void *)scheduler;
 
288
 
 
289
  if (scheduler->init(session))
 
290
  {
 
291
    errmsg_printf(ERRMSG_LVL_ERROR, _("Scheduler init error in libevent_add_new_connection\n"));
 
292
    pthread_mutex_unlock(&LOCK_thread_count);
 
293
    libevent_connection_close(session);
 
294
 
 
295
    return;
 
296
  }
 
297
  threads.append(session);
 
298
  libevent_session_add(session);
 
299
 
 
300
  pthread_mutex_unlock(&LOCK_thread_count);
 
301
  return;
 
302
}
 
303
 
 
304
 
 
305
/**
 
306
  @brief Signal a waiting connection it's time to die.
 
307
 
 
308
  @details This function will signal libevent the Session should be killed.
 
309
    Either the global LOCK_session_count or the Session's LOCK_delete must be locked
 
310
    upon entry.
 
311
 
 
312
  @param[in]  session The connection to kill
 
313
*/
 
314
 
 
315
static void libevent_post_kill_notification(Session *)
 
316
{
 
317
  /*
 
318
    Note, we just wake up libevent with an event that a Session should be killed,
 
319
    It will search its list of sessions for session->killed ==  KILL_CONNECTION to
 
320
    find the Sessions it should kill.
 
321
 
 
322
    So we don't actually tell it which one and we don't actually use the
 
323
    Session being passed to us, but that's just a design detail that could change
 
324
    later.
 
325
  */
 
326
  char c= 0;
 
327
  assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
 
328
}
 
329
 
 
330
 
 
331
/*
 
332
  Close and delete a connection.
 
333
*/
 
334
 
 
335
static void libevent_connection_close(Session *session)
 
336
{
 
337
  session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
338
  assert(scheduler);
 
339
  session->killed= Session::KILL_CONNECTION;          // Avoid error messages
 
340
 
 
341
  if (net_get_sd(&(session->net)) >= 0)                  // not already closed
 
342
  {
 
343
    end_connection(session);
 
344
    session->close_connection(0, 1);
 
345
  }
 
346
  scheduler->thread_detach();
 
347
  
 
348
  delete scheduler;
 
349
  session->scheduler= NULL;
 
350
 
 
351
  unlink_session(session);   /* locks LOCK_thread_count and deletes session */
 
352
  pthread_mutex_unlock(&LOCK_thread_count);
 
353
 
 
354
  return;
 
355
}
 
356
 
 
357
 
 
358
/*
 
359
  Returns true if we should close and delete a Session connection.
 
360
*/
 
361
 
 
362
bool libevent_should_close_connection(Session* session)
 
363
{
 
364
  return net_should_close(&(session->net)) ||
 
365
         session->killed == Session::KILL_CONNECTION;
 
366
}
 
367
 
 
368
 
 
369
/*
 
370
  libevent_thread_proc is the outer loop of each thread in the thread pool.
 
371
  These procs only return/terminate on shutdown (kill_pool_threads == true).
 
372
*/
 
373
 
 
374
pthread_handler_t libevent_thread_proc(void *arg __attribute__((unused)))
 
375
{
 
376
  if (init_new_connection_handler_thread())
 
377
  {
 
378
    my_thread_global_end();
 
379
    errmsg_printf(ERRMSG_LVL_ERROR, _("libevent_thread_proc: my_thread_init() failed\n"));
 
380
    exit(1);
 
381
  }
 
382
 
 
383
  /*
 
384
    Signal libevent_init() when all threads has been created and are ready to
 
385
    receive events.
 
386
  */
 
387
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
388
  created_threads++;
 
389
  if (created_threads == thread_pool_size)
 
390
    (void) pthread_cond_signal(&COND_thread_count);
 
391
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
392
 
 
393
  for (;;)
 
394
  {
 
395
    Session *session= NULL;
 
396
    (void) pthread_mutex_lock(&LOCK_event_loop);
 
397
 
 
398
    /* get session(s) to process */
 
399
    while (!sessions_need_processing)
 
400
    {
 
401
      if (kill_pool_threads)
 
402
      {
 
403
        /* the flag that we should die has been set */
 
404
        (void) pthread_mutex_unlock(&LOCK_event_loop);
 
405
        goto thread_exit;
 
406
      }
 
407
      event_loop(EVLOOP_ONCE);
 
408
    }
 
409
 
 
410
    /* pop the first session off the list */
 
411
    session= (Session*)sessions_need_processing->data;
 
412
    sessions_need_processing= list_delete(sessions_need_processing,
 
413
                                      sessions_need_processing);
 
414
    session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
415
 
 
416
    (void) pthread_mutex_unlock(&LOCK_event_loop);
 
417
 
 
418
    /* now we process the connection (session) */
 
419
 
 
420
    /* set up the session<->thread links. */
 
421
    session->thread_stack= (char*) &session;
 
422
 
 
423
    if (scheduler->thread_attach())
 
424
    {
 
425
      libevent_connection_close(session);
 
426
      continue;
 
427
    }
 
428
 
 
429
    /* is the connection logged in yet? */
 
430
    if (!scheduler->logged_in)
 
431
    {
 
432
      if (login_connection(session))
 
433
      {
 
434
        /* Failed to log in */
 
435
        libevent_connection_close(session);
 
436
        continue;
 
437
      }
 
438
      else
 
439
      {
 
440
        /* login successful */
 
441
        scheduler->logged_in= true;
 
442
        prepare_new_connection_state(session);
 
443
        if (!libevent_needs_immediate_processing(session))
 
444
          continue; /* New connection is now waiting for data in libevent*/
 
445
      }
 
446
    }
 
447
 
 
448
    do
 
449
    {
 
450
      /* Process a query */
 
451
      if (do_command(session))
 
452
      {
 
453
        libevent_connection_close(session);
 
454
        break;
 
455
      }
 
456
    } while (libevent_needs_immediate_processing(session));
 
457
  }
 
458
 
 
459
thread_exit:
 
460
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
461
  killed_threads++;
 
462
  pthread_cond_broadcast(&COND_thread_count);
 
463
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
464
  my_thread_end();
 
465
  pthread_exit(0);
 
466
  return(0);                               /* purify: deadcode */
 
467
}
 
468
 
 
469
 
 
470
/*
 
471
  Returns true if the connection needs immediate processing and false if
 
472
  instead it's queued for libevent processing or closed,
 
473
*/
 
474
 
 
475
static bool libevent_needs_immediate_processing(Session *session)
 
476
{
 
477
  session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
478
 
 
479
  if (libevent_should_close_connection(session))
 
480
  {
 
481
    libevent_connection_close(session);
 
482
    return false;
 
483
  }
 
484
  /*
 
485
    If more data in the socket buffer, return true to process another command.
 
486
 
 
487
    Note: we cannot add for event processing because the whole request might
 
488
    already be buffered and we wouldn't receive an event.
 
489
  */
 
490
  if (net_more_data(&(session->net)))
 
491
    return true;
 
492
 
 
493
  scheduler->thread_detach();
 
494
  libevent_session_add(session);
 
495
 
 
496
  return false;
 
497
}
 
498
 
 
499
 
 
500
/*
 
501
  Adds a Session to queued for libevent processing.
 
502
 
 
503
  This call does not actually register the event with libevent.
 
504
  Instead, it places the Session onto a queue and signals libevent by writing
 
505
  a byte into session_add_pipe, which will cause our libevent_add_session_callback to
 
506
  be invoked which will find the Session on the queue and add it to libevent.
 
507
*/
 
508
 
 
509
void libevent_session_add(Session* session)
 
510
{
 
511
  char c= 0;
 
512
  session_scheduler *scheduler= (session_scheduler *)session->scheduler;
 
513
  assert(scheduler);
 
514
 
 
515
  pthread_mutex_lock(&LOCK_session_add);
 
516
  /* queue for libevent */
 
517
  sessions_need_adding= list_add(sessions_need_adding, &scheduler->list);
 
518
  /* notify libevent */
 
519
  assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
 
520
  pthread_mutex_unlock(&LOCK_session_add);
 
521
}
 
522
 
 
523
 
 
524
/**
 
525
  Wait until all pool threads have been deleted for clean shutdown
 
526
*/
 
527
 
 
528
static void libevent_end()
 
529
{
 
530
  (void) pthread_mutex_lock(&LOCK_thread_count);
 
531
 
 
532
  kill_pool_threads= true;
 
533
  while (killed_threads != created_threads)
 
534
  {
 
535
    /* wake up the event loop */
 
536
    char c= 0;
 
537
    assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
 
538
 
 
539
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
 
540
  }
 
541
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
542
 
 
543
  event_del(&session_add_event);
 
544
  close(session_add_pipe[0]);
 
545
  close(session_add_pipe[1]);
 
546
  event_del(&session_kill_event);
 
547
  close(session_kill_pipe[0]);
 
548
  close(session_kill_pipe[1]);
 
549
 
 
550
  (void) pthread_mutex_destroy(&LOCK_event_loop);
 
551
  (void) pthread_mutex_destroy(&LOCK_session_add);
 
552
  return;
 
553
}
 
554
 
 
555
static int init(void *p)
 
556
{
 
557
  scheduler_functions* func= (scheduler_functions *)p;
 
558
 
 
559
  assert(size != 0);
 
560
  func->max_threads= size;
 
561
  func->init= libevent_init;
 
562
  func->end=  libevent_end;
 
563
  func->post_kill_notification= libevent_post_kill_notification;
 
564
  func->add_connection= libevent_add_connection;
 
565
 
 
566
  return 0;
 
567
}
 
568
 
 
569
static int deinit(void *)
 
570
{
 
571
  return 0;
 
572
}
 
573
 
 
574
/* 
 
575
  The defaults here were picked based on what I see (aka Brian). They should
 
576
  be vetted across a larger audience.
 
577
*/
 
578
static DRIZZLE_SYSVAR_UINT(size, size,
 
579
                           PLUGIN_VAR_RQCMDARG,
 
580
                           N_("Size of Pool."),
 
581
                           NULL, NULL, 8, 1, 1024, 0);
 
582
 
 
583
static struct st_mysql_sys_var* system_variables[]= {
 
584
  DRIZZLE_SYSVAR(size),
 
585
  NULL,
 
586
};
 
587
 
 
588
mysql_declare_plugin(pool_of_threads)
 
589
{
 
590
  DRIZZLE_SCHEDULING_PLUGIN,
 
591
  "pool_of_threads",
 
592
  "0.1",
 
593
  "Brian Aker",
 
594
  "Pool of Threads Scheduler",
 
595
  PLUGIN_LICENSE_GPL,
 
596
  init, /* Plugin Init */
 
597
  deinit, /* Plugin Deinit */
 
598
  NULL,   /* status variables */
 
599
  system_variables,   /* system variables */
 
600
  NULL    /* config options */
 
601
}
 
602
mysql_declare_plugin_end;