~drizzle-trunk/drizzle/development

803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
1
/* Copyright (C) 2006 MySQL AB
2
3
   This program is free software; you can redistribute it and/or modify
4
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
6
7
   This program is distributed in the hope that it will be useful,
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
   GNU General Public License for more details.
11
12
   You should have received a copy of the GNU General Public License
13
   along with this program; if not, write to the Free Software
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
16
#include <drizzled/server_includes.h>
17
#include <drizzled/gettext.h>
18
#include <drizzled/error.h>
960.2.22 by Monty Taylor
Renamed a bunch of plugin files.
19
#include <drizzled/plugin/scheduler.h>
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
20
#include <drizzled/serialize/serialize.h>
21
#include <drizzled/connect.h>
22
#include <drizzled/sql_parse.h>
23
#include <drizzled/session.h>
24
#include "session_scheduler.h"
25
#include <string>
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
26
#include <list>
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
27
#include <event.h>
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
28
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
29
using namespace std;
30
960.1.1 by Monty Taylor
First pass at scheduler plugin.
31
static volatile bool kill_pool_threads= false;
32
929.1.1 by Brian Aker
Push thread count out to the scheduler.
33
static volatile uint32_t created_threads= 0;
874 by Brian Aker
Refactor out function indirection in pool_of_threads.
34
static int deinit(void *);
35
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
36
static struct event session_add_event;
37
static struct event session_kill_event;
38
39
static pthread_mutex_t LOCK_session_add;    /* protects sessions_need_adding */
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
40
static list<Session *> sessions_need_adding; /* list of sessions to add to libevent queue */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
41
42
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
43
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
44
45
/*
46
  LOCK_event_loop protects the non-thread safe libevent calls (event_add and
47
  event_del) and sessions_need_processing and sessions_waiting_for_io.
48
*/
49
static pthread_mutex_t LOCK_event_loop;
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
50
static list<Session *> sessions_need_processing; /* list of sessions that needs some processing */
51
static list<Session *> sessions_waiting_for_io; /* list of sessions with added events */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
52
53
static bool libevent_needs_immediate_processing(Session *session);
54
static void libevent_connection_close(Session *session);
55
void libevent_session_add(Session* session);
56
bool libevent_should_close_connection(Session* session);
779.3.23 by Monty Taylor
More fixy-fixes.
57
extern "C" {
58
  pthread_handler_t libevent_thread_proc(void *arg);
59
  void libevent_io_callback(int Fd, short Operation, void *ctx);
60
  void libevent_add_session_callback(int Fd, short Operation, void *ctx);
61
  void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
62
}
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
63
64
static uint32_t size= 0;
65
66
/*
67
  Create a pipe and set to non-blocking.
68
  Returns true if there is an error.
69
*/
70
71
static bool init_pipe(int pipe_fds[])
72
{
73
  int flags;
74
  return pipe(pipe_fds) < 0 ||
75
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
779.3.23 by Monty Taylor
More fixy-fixes.
76
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
77
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
78
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
79
}
80
81
82
83
84
85
/*
86
  This is called when data is ready on the socket.
87
88
  NOTES
89
    This is only called by the thread that owns LOCK_event_loop.
90
91
    We add the session that got the data to sessions_need_processing, and
92
    cause the libevent event_loop() to terminate. Then this same thread will
93
    return from event_loop and pick the session value back up for processing.
94
*/
95
96
void libevent_io_callback(int, short, void *ctx)
97
{
98
  safe_mutex_assert_owner(&LOCK_event_loop);
99
  Session *session= (Session*)ctx;
100
  session_scheduler *scheduler= (session_scheduler *)session->scheduler;
101
  assert(scheduler);
916.1.7 by Padraig O'Sullivan
Giving session_scheduler class member a better name based on re-factoring.
102
  sessions_waiting_for_io.remove(scheduler->session);
103
  sessions_need_processing.push_front(scheduler->session);
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
104
}
105
106
/*
916.1.4 by Padraig O'Sullivan
Making the removal of elements from the list safe.
107
  Function object which is used to determine whether to remove
108
  a session from the sessions_waiting_for_io list.
109
*/
110
class remove_session_if
111
{
112
  public:
113
  remove_session_if() { }
114
  inline bool operator()(const Session *session) const
115
  {
916.1.5 by Padraig O'Sullivan
Duh, stupid syntax error.
116
    return (session->killed == Session::KILL_CONNECTION);
916.1.4 by Padraig O'Sullivan
Making the removal of elements from the list safe.
117
  }
118
};
119
120
/*
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
121
  This is called when we have a thread we want to be killed.
122
123
  NOTES
124
    This is only called by the thread that owns LOCK_event_loop.
125
*/
126
127
void libevent_kill_session_callback(int Fd, short, void*)
128
{
129
  safe_mutex_assert_owner(&LOCK_event_loop);
130
131
  /* clear the pending events */
132
  char c;
133
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
134
  {}
135
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
136
  list<Session *>::iterator it= sessions_waiting_for_io.begin();
137
  while (it != sessions_waiting_for_io.end())
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
138
  {
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
139
    Session *session= *it;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
140
    if (session->killed == Session::KILL_CONNECTION)
141
    {
142
      session_scheduler *scheduler= (session_scheduler *)session->scheduler;
143
      assert(scheduler);
144
      /*
145
        Delete from libevent and add to the processing queue.
146
      */
805 by Brian Aker
Refactor init/deinit into the normal startup structures for a plugin.
147
      event_del(&scheduler->io_event);
916.1.7 by Padraig O'Sullivan
Giving session_scheduler class member a better name based on re-factoring.
148
      sessions_need_processing.push_front(scheduler->session);
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
149
    }
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
150
    ++it;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
151
  }
916.1.4 by Padraig O'Sullivan
Making the removal of elements from the list safe.
152
153
  /* 
154
    safely remove elements from the sessions_waiting_for_io list
155
  */
156
  sessions_waiting_for_io.erase(std::remove_if(sessions_waiting_for_io.begin(),
919.2.1 by Monty Taylor
Merged from Padraig: removal of LIST in pool_of_threads.
157
                                               sessions_waiting_for_io.end(),
158
                                               remove_session_if()),
916.1.4 by Padraig O'Sullivan
Making the removal of elements from the list safe.
159
                                sessions_waiting_for_io.end());
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
160
}
161
162
163
/*
164
  This is used to add connections to the pool. This callback is invoked from
165
  the libevent event_loop() call whenever the session_add_pipe[1] pipe has a byte
166
  written to it.
167
168
  NOTES
169
    This is only called by the thread that owns LOCK_event_loop.
170
*/
171
172
void libevent_add_session_callback(int Fd, short, void *)
173
{
174
  safe_mutex_assert_owner(&LOCK_event_loop);
175
176
  /* clear the pending events */
177
  char c;
178
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
179
  {}
180
181
  pthread_mutex_lock(&LOCK_session_add);
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
182
  while (!sessions_need_adding.empty())
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
183
  {
184
    /* pop the first session off the list */
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
185
    Session* session= sessions_need_adding.front();
186
    sessions_need_adding.pop_front();
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
187
    session_scheduler *scheduler= (session_scheduler *)session->scheduler;
188
    assert(scheduler);
189
190
    pthread_mutex_unlock(&LOCK_session_add);
191
192
    if (!scheduler->logged_in || libevent_should_close_connection(session))
193
    {
194
      /*
195
        Add session to sessions_need_processing list. If it needs closing we'll close
196
        it outside of event_loop().
197
      */
916.1.7 by Padraig O'Sullivan
Giving session_scheduler class member a better name based on re-factoring.
198
      sessions_need_processing.push_front(scheduler->session);
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
199
    }
200
    else
201
    {
202
      /* Add to libevent */
805 by Brian Aker
Refactor init/deinit into the normal startup structures for a plugin.
203
      if (event_add(&scheduler->io_event, NULL))
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
204
      {
205
        errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
206
        libevent_connection_close(session);
207
      }
208
      else
209
      {
916.1.7 by Padraig O'Sullivan
Giving session_scheduler class member a better name based on re-factoring.
210
        sessions_waiting_for_io.push_front(scheduler->session);
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
211
      }
212
    }
213
    pthread_mutex_lock(&LOCK_session_add);
214
  }
215
  pthread_mutex_unlock(&LOCK_session_add);
216
}
217
218
960.1.1 by Monty Taylor
First pass at scheduler plugin.
219
class Pool_of_threads_scheduler: public Scheduler
220
{
221
private:
222
  pthread_attr_t thread_attrib;
223
224
public:
225
  Pool_of_threads_scheduler(uint32_t max_size_in)
226
    : Scheduler(max_size_in)
227
  {
228
    /* Parameter for threads created for connections */
229
    (void) pthread_attr_init(&thread_attrib);
230
    (void) pthread_attr_setdetachstate(&thread_attrib,
231
  				     PTHREAD_CREATE_DETACHED);
232
    pthread_attr_setscope(&thread_attrib, PTHREAD_SCOPE_SYSTEM);
233
    {
234
      struct sched_param tmp_sched_param;
235
  
236
      memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
237
      tmp_sched_param.sched_priority= WAIT_PRIOR;
238
      (void)pthread_attr_setschedparam(&thread_attrib, &tmp_sched_param);
239
    }
240
  }
241
242
  ~Pool_of_threads_scheduler()
243
  {
244
    (void) pthread_mutex_lock(&LOCK_thread_count);
245
  
246
    kill_pool_threads= true;
247
    while (created_threads)
248
    {
249
      /* wake up the event loop */
250
      char c= 0;
251
      assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
252
  
253
      pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
254
    }
255
    (void) pthread_mutex_unlock(&LOCK_thread_count);
256
  
257
    event_del(&session_add_event);
258
    close(session_add_pipe[0]);
259
    close(session_add_pipe[1]);
260
    event_del(&session_kill_event);
261
    close(session_kill_pipe[0]);
262
    close(session_kill_pipe[1]);
263
  
264
    (void) pthread_mutex_destroy(&LOCK_event_loop);
265
    (void) pthread_mutex_destroy(&LOCK_session_add);
266
  }
267
268
  /**
269
    Notify the thread pool about a new connection
270
  
271
    NOTES
272
      LOCK_thread_count is locked on entry. This function MUST unlock it!
273
  */
274
  
275
  virtual bool add_connection(Session *session)
276
  {
277
    assert(session->scheduler == NULL);
278
    session_scheduler *scheduler= new session_scheduler(session);
279
  
280
    if (scheduler == NULL)
281
      return true;
282
  
283
    session->scheduler= (void *)scheduler;
284
  
285
    libevent_session_add(session);
286
  
287
    return false;
288
  }
289
  
290
  
291
  /**
292
    @brief Signal a waiting connection it's time to die.
293
  
294
    @details This function will signal libevent the Session should be killed.
295
      Either the global LOCK_session_count or the Session's LOCK_delete must be locked
296
      upon entry.
297
  
298
    @param[in]  session The connection to kill
299
  */
300
  
301
  virtual void post_kill_notification(Session *)
302
  {
303
    /*
304
      Note, we just wake up libevent with an event that a Session should be killed,
305
      It will search its list of sessions for session->killed ==  KILL_CONNECTION to
306
      find the Sessions it should kill.
307
  
308
      So we don't actually tell it which one and we don't actually use the
309
      Session being passed to us, but that's just a design detail that could change
310
      later.
311
    */
312
    char c= 0;
313
    assert(write(session_kill_pipe[1], &c, sizeof(c))==sizeof(c));
314
  }
315
316
  virtual uint32_t count(void)
317
  {
318
    return created_threads;
319
  }
320
321
  /**
322
    Create all threads for the thread pool
323
  
324
    NOTES
325
      After threads are created we wait until all threads has signaled that
326
      they have started before we return
327
  
328
    RETURN
329
      0  ok
330
      1  We got an error creating the thread pool
331
         In this case we will abort all created threads
332
  */
333
  
334
  bool libevent_init(void)
335
  {
336
    uint32_t x;
337
  
338
    event_init();
339
  
340
    pthread_mutex_init(&LOCK_event_loop, NULL);
341
    pthread_mutex_init(&LOCK_session_add, NULL);
342
  
343
    /* set up the pipe used to add new sessions to the event pool */
344
    if (init_pipe(session_add_pipe))
345
    {
346
      errmsg_printf(ERRMSG_LVL_ERROR,
347
                    _("init_pipe(session_add_pipe) error in libevent_init\n"));
348
      return true;
349
    }
350
    /* set up the pipe used to kill sessions in the event queue */
351
    if (init_pipe(session_kill_pipe))
352
    {
353
      errmsg_printf(ERRMSG_LVL_ERROR,
354
                    _("init_pipe(session_kill_pipe) error in libevent_init\n"));
355
      close(session_add_pipe[0]);
356
      close(session_add_pipe[1]);
357
     return true;
358
    }
359
    event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
360
              libevent_add_session_callback, NULL);
361
    event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
362
              libevent_kill_session_callback, NULL);
363
  
364
   if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
365
   {
366
     errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
367
     deinit(NULL);
368
     return true;
369
  
370
   }
371
    /* Set up the thread pool */
372
    pthread_mutex_lock(&LOCK_thread_count);
373
  
374
    for (x= 0; x < size; x++)
375
    {
376
      pthread_t thread;
377
      int error;
378
      if ((error= pthread_create(&thread, &thread_attrib, libevent_thread_proc, 0)))
379
      {
380
        errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
381
                        error);
382
        pthread_mutex_unlock(&LOCK_thread_count);
383
        deinit(NULL);                      // Cleanup
384
        return true;
385
      }
386
    }
387
  
388
    /* Wait until all threads are created */
389
    while (created_threads != size)
390
      pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
391
    pthread_mutex_unlock(&LOCK_thread_count);
392
  
393
    return false;
394
  }
395
}; 
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
396
397
/*
398
  Close and delete a connection.
399
*/
400
401
static void libevent_connection_close(Session *session)
402
{
403
  session_scheduler *scheduler= (session_scheduler *)session->scheduler;
404
  assert(scheduler);
405
  session->killed= Session::KILL_CONNECTION;          // Avoid error messages
406
840.1.20 by Monty Taylor
Renamed non-prefixed things from libdrizzleclient to drizzleclient.
407
  if (drizzleclient_net_get_sd(&(session->net)) >= 0)                  // not already closed
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
408
  {
934.2.6 by Jay Pipes
This changeset removes a few more C functions from sql_connect.cc/connect.h
409
    session->disconnect(0, true);
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
410
  }
411
  scheduler->thread_detach();
412
  
413
  delete scheduler;
414
  session->scheduler= NULL;
415
938 by Brian Aker
Merge of Monty/Eric/Padraig
416
  unlink_session(session);   /* locks LOCK_thread_count and deletes session */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
417
418
  return;
419
}
420
421
422
/*
423
  Returns true if we should close and delete a Session connection.
424
*/
425
426
bool libevent_should_close_connection(Session* session)
427
{
840.1.20 by Monty Taylor
Renamed non-prefixed things from libdrizzleclient to drizzleclient.
428
  return drizzleclient_net_should_close(&(session->net)) ||
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
429
         session->killed == Session::KILL_CONNECTION;
430
}
431
432
433
/*
434
  libevent_thread_proc is the outer loop of each thread in the thread pool.
435
  These procs only return/terminate on shutdown (kill_pool_threads == true).
436
*/
437
779.3.23 by Monty Taylor
More fixy-fixes.
438
pthread_handler_t libevent_thread_proc(void *)
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
439
{
866 by Brian Aker
Remove un-needed depth (real simple.. working on adding back other
440
  if (my_thread_init())
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
441
  {
442
    my_thread_global_end();
443
    errmsg_printf(ERRMSG_LVL_ERROR, _("libevent_thread_proc: my_thread_init() failed\n"));
444
    exit(1);
445
  }
446
447
  /*
448
    Signal libevent_init() when all threads has been created and are ready to
449
    receive events.
450
  */
451
  (void) pthread_mutex_lock(&LOCK_thread_count);
452
  created_threads++;
808 by Brian Aker
Move number of threads to use for pool of threads to module. Removed slave
453
  if (created_threads == size)
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
454
    (void) pthread_cond_signal(&COND_thread_count);
455
  (void) pthread_mutex_unlock(&LOCK_thread_count);
456
457
  for (;;)
458
  {
459
    Session *session= NULL;
460
    (void) pthread_mutex_lock(&LOCK_event_loop);
461
462
    /* get session(s) to process */
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
463
    while (sessions_need_processing.empty())
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
464
    {
465
      if (kill_pool_threads)
466
      {
467
        /* the flag that we should die has been set */
468
        (void) pthread_mutex_unlock(&LOCK_event_loop);
469
        goto thread_exit;
470
      }
471
      event_loop(EVLOOP_ONCE);
472
    }
473
474
    /* pop the first session off the list */
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
475
    session= sessions_need_processing.front();
476
    sessions_need_processing.pop_front();
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
477
    session_scheduler *scheduler= (session_scheduler *)session->scheduler;
478
479
    (void) pthread_mutex_unlock(&LOCK_event_loop);
480
481
    /* now we process the connection (session) */
482
483
    /* set up the session<->thread links. */
484
    session->thread_stack= (char*) &session;
485
486
    if (scheduler->thread_attach())
487
    {
488
      libevent_connection_close(session);
489
      continue;
490
    }
491
492
    /* is the connection logged in yet? */
493
    if (!scheduler->logged_in)
494
    {
934.2.4 by Jay Pipes
This changeset pulls check_user(), check_connection(), and login_connection() out of sql_connect.cc and makes them member methods of Session, where they belong. Also, made sure that functions that return a bool return true when it succeeds, and not false...
495
      if (! session->authenticate())
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
496
      {
497
        /* Failed to log in */
498
        libevent_connection_close(session);
499
        continue;
500
      }
501
      else
502
      {
503
        /* login successful */
504
        scheduler->logged_in= true;
934.2.6 by Jay Pipes
This changeset removes a few more C functions from sql_connect.cc/connect.h
505
        session->prepareForQueries();
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
506
        if (!libevent_needs_immediate_processing(session))
507
          continue; /* New connection is now waiting for data in libevent*/
508
      }
509
    }
510
511
    do
512
    {
513
      /* Process a query */
934.2.8 by Jay Pipes
Refactors the do_command() function out of the sql_parse.cc stuff and implements it as a member method, executeStatement() on the Session object.
514
      if (! session->executeStatement())
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
515
      {
516
        libevent_connection_close(session);
517
        break;
518
      }
519
    } while (libevent_needs_immediate_processing(session));
929.1.1 by Brian Aker
Push thread count out to the scheduler.
520
521
    if (kill_pool_threads) /* the flag that we should die has been set */
522
      goto thread_exit;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
523
  }
524
525
thread_exit:
526
  (void) pthread_mutex_lock(&LOCK_thread_count);
929.1.1 by Brian Aker
Push thread count out to the scheduler.
527
  created_threads--;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
528
  pthread_cond_broadcast(&COND_thread_count);
529
  (void) pthread_mutex_unlock(&LOCK_thread_count);
530
  my_thread_end();
531
  pthread_exit(0);
929.1.1 by Brian Aker
Push thread count out to the scheduler.
532
533
  return NULL;                               /* purify: deadcode */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
534
}
535
536
537
/*
538
  Returns true if the connection needs immediate processing and false if
539
  instead it's queued for libevent processing or closed,
540
*/
541
542
static bool libevent_needs_immediate_processing(Session *session)
543
{
544
  session_scheduler *scheduler= (session_scheduler *)session->scheduler;
545
546
  if (libevent_should_close_connection(session))
547
  {
548
    libevent_connection_close(session);
549
    return false;
550
  }
551
  /*
552
    If more data in the socket buffer, return true to process another command.
553
554
    Note: we cannot add for event processing because the whole request might
555
    already be buffered and we wouldn't receive an event.
556
  */
840.1.20 by Monty Taylor
Renamed non-prefixed things from libdrizzleclient to drizzleclient.
557
  if (drizzleclient_net_more_data(&(session->net)))
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
558
    return true;
559
560
  scheduler->thread_detach();
561
  libevent_session_add(session);
562
563
  return false;
564
}
565
566
567
/*
568
  Adds a Session to queued for libevent processing.
569
570
  This call does not actually register the event with libevent.
571
  Instead, it places the Session onto a queue and signals libevent by writing
572
  a byte into session_add_pipe, which will cause our libevent_add_session_callback to
573
  be invoked which will find the Session on the queue and add it to libevent.
574
*/
575
576
void libevent_session_add(Session* session)
577
{
578
  char c= 0;
579
  session_scheduler *scheduler= (session_scheduler *)session->scheduler;
580
  assert(scheduler);
581
582
  pthread_mutex_lock(&LOCK_session_add);
583
  /* queue for libevent */
916.1.7 by Padraig O'Sullivan
Giving session_scheduler class member a better name based on re-factoring.
584
  sessions_need_adding.push_front(scheduler->session);
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
585
  /* notify libevent */
586
  assert(write(session_add_pipe[1], &c, sizeof(c))==sizeof(c));
587
  pthread_mutex_unlock(&LOCK_session_add);
588
}
589
590
960.1.1 by Monty Taylor
First pass at scheduler plugin.
591
592
static int init(void *p)
593
{
594
  assert(size != 0);
960.1.5 by Monty Taylor
Cleaned up pool_of_threads scheduler just a bit.
595
596
  void **plugin= static_cast<void **>(p);
597
598
  Pool_of_threads_scheduler *sched=
599
    new Pool_of_threads_scheduler(size);
600
  if (sched->libevent_init())
601
  {
602
    delete sched;
603
    return 1;
604
  }
605
606
  *plugin= static_cast<void *>(sched);
607
608
  return 0;
960.1.1 by Monty Taylor
First pass at scheduler plugin.
609
}
610
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
611
/**
612
  Wait until all pool threads have been deleted for clean shutdown
613
*/
614
960.1.1 by Monty Taylor
First pass at scheduler plugin.
615
static int deinit(void *p)
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
616
{
960.1.1 by Monty Taylor
First pass at scheduler plugin.
617
  Scheduler *sched= static_cast<Scheduler *>(p);
618
  delete sched;
874 by Brian Aker
Refactor out function indirection in pool_of_threads.
619
620
  return 0;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
621
}
622
623
/* 
624
  The defaults here were picked based on what I see (aka Brian). They should
625
  be vetted across a larger audience.
626
*/
627
static DRIZZLE_SYSVAR_UINT(size, size,
628
                           PLUGIN_VAR_RQCMDARG,
629
                           N_("Size of Pool."),
630
                           NULL, NULL, 8, 1, 1024, 0);
631
632
static struct st_mysql_sys_var* system_variables[]= {
633
  DRIZZLE_SYSVAR(size),
634
  NULL,
635
};
636
813.2.1 by Toru Maesaka
Renamed mysql_declare_plugin to drizzle_declare_plugin
637
drizzle_declare_plugin(pool_of_threads)
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
638
{
639
  DRIZZLE_SCHEDULING_PLUGIN,
640
  "pool_of_threads",
641
  "0.1",
642
  "Brian Aker",
643
  "Pool of Threads Scheduler",
644
  PLUGIN_LICENSE_GPL,
645
  init, /* Plugin Init */
646
  deinit, /* Plugin Deinit */
647
  NULL,   /* status variables */
648
  system_variables,   /* system variables */
649
  NULL    /* config options */
650
}
813.2.2 by Toru Maesaka
Renamed mysql_declare_plugin_end to drizzle_declare_plugin_end
651
drizzle_declare_plugin_end;