~drizzle-trunk/drizzle/development

1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
1089.12.4 by Biping Meng
Several refinements on comments
2
 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 * Copyright (C) 2006 MySQL AB
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
5
 * Copyright (C) 2009 Sun Microsystems
1089.12.4 by Biping Meng
Several refinements on comments
6
 *
7
 * This program is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; version 2 of the License.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software
18
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19
 */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
20
1241.9.36 by Monty Taylor
ZOMG. I deleted drizzled/server_includes.h.
21
#include "config.h"
1241.9.1 by Monty Taylor
Removed global.h. Fixed all the headers.
22
#include <fcntl.h>
1152.1.5 by Brian Aker
Remove Factory/make scheduler work like everything else.
23
#include <plugin/pool_of_threads/pool_of_threads.h>
1241.9.31 by Monty Taylor
Moved global pthread variables into their own header.
24
#include "drizzled/pthread_globals.h"
1241.9.64 by Monty Taylor
Moved remaining non-public portions of mysys and mystrings to drizzled/internal.
25
#include "drizzled/internal/my_pthread.h"
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
26
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
27
using namespace std;
971.3.64 by Eric Day
Cleaned up Scheduler plugin, moved more code to the schedular plugins, reworked some functions to be methods in Session, removed some dead code.
28
using namespace drizzled;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
29
1152.1.5 by Brian Aker
Remove Factory/make scheduler work like everything else.
30
/* Global's (TBR) */
31
static PoolOfThreadsScheduler *scheduler= NULL;
32
1089.12.4 by Biping Meng
Several refinements on comments
33
/**
34
 * Set this to true to trigger killing of all threads in the pool
35
 */
960.1.1 by Monty Taylor
First pass at scheduler plugin.
36
static volatile bool kill_pool_threads= false;
37
929.1.1 by Brian Aker
Push thread count out to the scheduler.
38
static volatile uint32_t created_threads= 0;
1110.1.5 by Monty Taylor
Renamed PluginRegistry to plugin::Registry.
39
static int deinit(drizzled::plugin::Registry &registry);
874 by Brian Aker
Refactor out function indirection in pool_of_threads.
40
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
41
static struct event session_add_event;
42
static struct event session_kill_event;
43
1089.12.7 by Biping Meng
sessions_to_be_killed added in order to avoid searching when dealing with session killing notification
44
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
45
static int session_add_pipe[2]; /* pipe to signal add a connection to libevent*/
46
static int session_kill_pipe[2]; /* pipe to signal kill a connection in libevent */
47
48
49
static bool libevent_needs_immediate_processing(Session *session);
50
static void libevent_connection_close(Session *session);
51
void libevent_session_add(Session* session);
52
bool libevent_should_close_connection(Session* session);
779.3.23 by Monty Taylor
More fixy-fixes.
53
extern "C" {
1241.9.62 by Monty Taylor
Removed plugin/myisam/myisam.h from session.h
54
  void *libevent_thread_proc(void *arg);
779.3.23 by Monty Taylor
More fixy-fixes.
55
  void libevent_io_callback(int Fd, short Operation, void *ctx);
56
  void libevent_add_session_callback(int Fd, short Operation, void *ctx);
57
  void libevent_kill_session_callback(int Fd, short Operation, void *ctx);
58
}
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
59
60
static uint32_t size= 0;
61
1089.12.4 by Biping Meng
Several refinements on comments
62
/**
1089.12.10 by Biping Meng
Some refinements on comments
63
 * @brief 
64
 *  Create a pipe and set to non-blocking. 
65
 * @return 
66
 *  True if there is an error.
1089.12.4 by Biping Meng
Several refinements on comments
67
 */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
68
static bool init_pipe(int pipe_fds[])
69
{
70
  int flags;
71
  return pipe(pipe_fds) < 0 ||
72
          (flags= fcntl(pipe_fds[0], F_GETFL)) == -1 ||
779.3.23 by Monty Taylor
More fixy-fixes.
73
          fcntl(pipe_fds[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
74
          (flags= fcntl(pipe_fds[1], F_GETFL)) == -1 ||
75
          fcntl(pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1;
76
}
77
78
79
80
81
1089.12.4 by Biping Meng
Several refinements on comments
82
/**
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
83
 * @brief
84
 *  This is called when data is ready on the socket.
1089.12.4 by Biping Meng
Several refinements on comments
85
 *
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
86
 * @details
1089.12.4 by Biping Meng
Several refinements on comments
87
 *  This is only called by the thread that owns LOCK_event_loop.
88
 *
89
 *  We add the session that got the data to sessions_need_processing, and
90
 *  cause the libevent event_loop() to terminate. Then this same thread will
91
 *  return from event_loop and pick the session value back up for
92
 *  processing.
93
 */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
94
void libevent_io_callback(int, short, void *ctx)
95
{
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
96
  Session *session= reinterpret_cast<Session*>(ctx);
97
  session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
98
  assert(sched);
99
  PoolOfThreadsScheduler *pot_scheduler= static_cast<PoolOfThreadsScheduler *>(session->scheduler);
100
  pot_scheduler->doIO(sched);
101
}
102
103
void PoolOfThreadsScheduler::doIO(session_scheduler *sched)
104
{
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
105
  safe_mutex_assert_owner(&LOCK_event_loop);
1152.1.7 by Brian Aker
Fix for shadow issues.
106
  sessions_waiting_for_io.erase(sched->session);
107
  sessions_need_processing.push(sched->session);
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
108
}
1089.12.4 by Biping Meng
Several refinements on comments
109
/**
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
110
 * @brief
1089.12.4 by Biping Meng
Several refinements on comments
111
 *  This is called when we have a thread we want to be killed.
112
 *
113
 * @details
114
 *  This is only called by the thread that owns LOCK_event_loop.
115
 */
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
116
void libevent_kill_session_callback(int Fd, short, void *ctx)
117
{
118
  PoolOfThreadsScheduler *pot_scheduler=
119
    reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
120
121
  pot_scheduler->killSession(Fd);
122
}
123
124
void PoolOfThreadsScheduler::killSession(int Fd)
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
125
{
126
  safe_mutex_assert_owner(&LOCK_event_loop);
1089.12.10 by Biping Meng
Some refinements on comments
127
  /*
128
   For pending events clearing
129
  */
1089.12.8 by Biping Meng
If libevent_add_session is called when some thread is executing libevent_add_session_callback, newly connected sessions are being processed in *this* event loop. But new characters writen into the event pipe will not be cleared in this batch. So we choose to clear the pending events at the end of libevent_add_session routine.
130
  char c;
131
  int count= 0;
1089.12.7 by Biping Meng
sessions_to_be_killed added in order to avoid searching when dealing with session killing notification
132
133
  pthread_mutex_lock(&LOCK_session_kill);
1089.12.12 by Biping Meng
Some tiny cleanups
134
  while (! sessions_to_be_killed.empty())
1089.12.7 by Biping Meng
sessions_to_be_killed added in order to avoid searching when dealing with session killing notification
135
  {
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
136
1089.12.10 by Biping Meng
Some refinements on comments
137
    /*
138
     Fetch a session from the queue
139
    */
1089.12.7 by Biping Meng
sessions_to_be_killed added in order to avoid searching when dealing with session killing notification
140
    Session* session= sessions_to_be_killed.front();
141
    pthread_mutex_unlock(&LOCK_session_kill);
142
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
143
    session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
1152.1.7 by Brian Aker
Fix for shadow issues.
144
    assert(sched);
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
145
1089.12.10 by Biping Meng
Some refinements on comments
146
    /*
147
     Delete from libevent and add to the processing queue.
148
    */
1152.1.7 by Brian Aker
Fix for shadow issues.
149
    event_del(&sched->io_event);
1089.12.10 by Biping Meng
Some refinements on comments
150
    /*
151
     Remove from the sessions_waiting_for_io set
152
    */
1089.12.7 by Biping Meng
sessions_to_be_killed added in order to avoid searching when dealing with session killing notification
153
    sessions_waiting_for_io.erase(session);
1089.12.10 by Biping Meng
Some refinements on comments
154
    /*
155
     Push into the sessions_need_processing; the kill action will be
156
     performed out of the event loop
157
    */
1152.1.7 by Brian Aker
Fix for shadow issues.
158
    sessions_need_processing.push(sched->session);
1089.12.9 by Biping Meng
Bug causing assertion failure fixed
159
1089.12.7 by Biping Meng
sessions_to_be_killed added in order to avoid searching when dealing with session killing notification
160
    pthread_mutex_lock(&LOCK_session_kill);
1089.12.10 by Biping Meng
Some refinements on comments
161
    /*
162
     Pop until this session is already processed
163
    */
1089.12.9 by Biping Meng
Bug causing assertion failure fixed
164
    sessions_to_be_killed.pop();
1089.12.7 by Biping Meng
sessions_to_be_killed added in order to avoid searching when dealing with session killing notification
165
  }
166
  
1089.12.10 by Biping Meng
Some refinements on comments
167
  /*
168
   Clear the pending events 
169
   One and only one charactor should be in the pipe
170
  */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
171
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
172
  {
1089.12.7 by Biping Meng
sessions_to_be_killed added in order to avoid searching when dealing with session killing notification
173
    count++;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
174
  }
1089.12.7 by Biping Meng
sessions_to_be_killed added in order to avoid searching when dealing with session killing notification
175
  assert(count == 1);
176
  pthread_mutex_unlock(&LOCK_session_kill);
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
177
}
178
179
1089.12.4 by Biping Meng
Several refinements on comments
180
/**
181
 * @brief
182
 *  This is used to add connections to the pool. This callback is invoked
183
 *  from the libevent event_loop() call whenever the session_add_pipe[1]
184
 *  pipe has a byte written to it.
185
 *
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
186
 * @details
1089.12.4 by Biping Meng
Several refinements on comments
187
 *  This is only called by the thread that owns LOCK_event_loop.
188
 */
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
189
void libevent_add_session_callback(int Fd, short, void *ctx)
190
{
191
  PoolOfThreadsScheduler *pot_scheduler=
192
    reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
193
  pot_scheduler->addSession(Fd);
194
}
195
196
void PoolOfThreadsScheduler::addSession(int Fd)
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
197
{
198
  safe_mutex_assert_owner(&LOCK_event_loop);
1089.12.10 by Biping Meng
Some refinements on comments
199
  /*
200
   For pending events clearing
201
  */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
202
  char c;
1089.12.8 by Biping Meng
If libevent_add_session is called when some thread is executing libevent_add_session_callback, newly connected sessions are being processed in *this* event loop. But new characters writen into the event pipe will not be cleared in this batch. So we choose to clear the pending events at the end of libevent_add_session routine.
203
  int count= 0;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
204
205
  pthread_mutex_lock(&LOCK_session_add);
1089.12.13 by Biping Meng
Made up the missing space
206
  while (! sessions_need_adding.empty())
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
207
  {
1089.12.10 by Biping Meng
Some refinements on comments
208
    /*
209
     Pop the first session off the queue 
210
    */
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
211
    Session* session= sessions_need_adding.front();
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
212
    pthread_mutex_unlock(&LOCK_session_add);
213
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
214
    session_scheduler *sched= static_cast<session_scheduler *>(session->scheduler_arg);
215
    assert(sched);
216
217
1152.1.7 by Brian Aker
Fix for shadow issues.
218
    if (!sched->logged_in || libevent_should_close_connection(session))
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
219
    {
1089.12.10 by Biping Meng
Some refinements on comments
220
      /*
221
       Add session to sessions_need_processing queue. If it needs closing
222
       we'll close it outside of event_loop().
223
      */
1152.1.7 by Brian Aker
Fix for shadow issues.
224
      sessions_need_processing.push(sched->session);
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
225
    }
226
    else
227
    {
228
      /* Add to libevent */
1152.1.7 by Brian Aker
Fix for shadow issues.
229
      if (event_add(&sched->io_event, NULL))
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
230
      {
231
        errmsg_printf(ERRMSG_LVL_ERROR, _("event_add error in libevent_add_session_callback\n"));
232
        libevent_connection_close(session);
233
      }
234
      else
235
      {
1152.1.7 by Brian Aker
Fix for shadow issues.
236
        sessions_waiting_for_io.insert(sched->session);
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
237
      }
238
    }
1089.12.9 by Biping Meng
Bug causing assertion failure fixed
239
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
240
    pthread_mutex_lock(&LOCK_session_add);
1089.12.10 by Biping Meng
Some refinements on comments
241
    /*
242
     Pop until this session is already processed
243
    */
1089.12.9 by Biping Meng
Bug causing assertion failure fixed
244
    sessions_need_adding.pop();
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
245
  }
1089.12.8 by Biping Meng
If libevent_add_session is called when some thread is executing libevent_add_session_callback, newly connected sessions are being processed in *this* event loop. But new characters writen into the event pipe will not be cleared in this batch. So we choose to clear the pending events at the end of libevent_add_session routine.
246
1089.12.10 by Biping Meng
Some refinements on comments
247
  /*
248
   Clear the pending events 
249
   One and only one charactor should be in the pipe
250
  */
1089.12.8 by Biping Meng
If libevent_add_session is called when some thread is executing libevent_add_session_callback, newly connected sessions are being processed in *this* event loop. But new characters writen into the event pipe will not be cleared in this batch. So we choose to clear the pending events at the end of libevent_add_session routine.
251
  while (read(Fd, &c, sizeof(c)) == sizeof(c))
252
  {
253
    count++;
254
  }
255
  assert(count == 1);
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
256
  pthread_mutex_unlock(&LOCK_session_add);
257
}
258
1089.12.4 by Biping Meng
Several refinements on comments
259
/**
260
 * @brief 
261
 *  Close and delete a connection.
262
 */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
263
static void libevent_connection_close(Session *session)
264
{
1152.1.7 by Brian Aker
Fix for shadow issues.
265
  session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
266
  assert(sched);
1089.12.10 by Biping Meng
Some refinements on comments
267
  session->killed= Session::KILL_CONNECTION;    /* Avoid error messages */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
268
971.6.8 by Eric Day
Merged trunk.
269
  if (session->client->getFileDescriptor() >= 0) /* not already closed */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
270
  {
934.2.6 by Jay Pipes
This changeset removes a few more C functions from sql_connect.cc/connect.h
271
    session->disconnect(0, true);
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
272
  }
1152.1.7 by Brian Aker
Fix for shadow issues.
273
  sched->thread_detach();
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
274
  
1152.1.7 by Brian Aker
Fix for shadow issues.
275
  delete sched;
971.3.64 by Eric Day
Cleaned up Scheduler plugin, moved more code to the schedular plugins, reworked some functions to be methods in Session, removed some dead code.
276
  session->scheduler_arg= NULL;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
277
1241.9.12 by Monty Taylor
Trims more out of server_includes.h.
278
  Session::unlink(session);   /* locks LOCK_thread_count and deletes session */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
279
280
  return;
281
}
282
283
1089.12.4 by Biping Meng
Several refinements on comments
284
/**
285
 * @brief 
286
 *  Checks if a session should be closed.
287
 *  
288
 * @retval true this session should be closed.  
289
 * @retval false not to be closed.
290
 */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
291
bool libevent_should_close_connection(Session* session)
292
{
971.6.1 by Eric Day
Renamed Protocol to Client, cleaned up some unnecessary methods along the way.
293
  return session->client->haveError() ||
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
294
         session->killed == Session::KILL_CONNECTION;
295
}
296
297
1089.12.4 by Biping Meng
Several refinements on comments
298
/**
299
 * @brief
300
 *  libevent_thread_proc is the outer loop of each thread in the thread pool.
301
 *  These procs only return/terminate on shutdown (kill_pool_threads ==
302
 *  true).
303
 */
1241.9.62 by Monty Taylor
Removed plugin/myisam/myisam.h from session.h
304
void *libevent_thread_proc(void *ctx)
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
305
{
1280.1.10 by Monty Taylor
Put everything in drizzled into drizzled namespace.
306
  if (internal::my_thread_init())
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
307
  {
1280.1.10 by Monty Taylor
Put everything in drizzled into drizzled namespace.
308
    internal::my_thread_global_end();
309
    errmsg_printf(ERRMSG_LVL_ERROR, _("libevent_thread_proc: internal::my_thread_init() failed\n"));
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
310
    exit(1);
311
  }
312
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
313
  PoolOfThreadsScheduler *pot_scheduler=
314
    reinterpret_cast<PoolOfThreadsScheduler *>(ctx);
315
  return pot_scheduler->mainLoop();
316
}
317
1241.9.62 by Monty Taylor
Removed plugin/myisam/myisam.h from session.h
318
void *PoolOfThreadsScheduler::mainLoop()
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
319
{
1089.12.10 by Biping Meng
Some refinements on comments
320
  /*
321
   Signal libevent_init() when all threads has been created and are ready
322
   to receive events.
323
  */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
324
  (void) pthread_mutex_lock(&LOCK_thread_count);
325
  created_threads++;
808 by Brian Aker
Move number of threads to use for pool of threads to module. Removed slave
326
  if (created_threads == size)
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
327
    (void) pthread_cond_signal(&COND_thread_count);
328
  (void) pthread_mutex_unlock(&LOCK_thread_count);
329
330
  for (;;)
331
  {
332
    Session *session= NULL;
333
    (void) pthread_mutex_lock(&LOCK_event_loop);
334
335
    /* get session(s) to process */
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
336
    while (sessions_need_processing.empty())
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
337
    {
338
      if (kill_pool_threads)
339
      {
340
        /* the flag that we should die has been set */
341
        (void) pthread_mutex_unlock(&LOCK_event_loop);
342
        goto thread_exit;
343
      }
344
      event_loop(EVLOOP_ONCE);
345
    }
346
1089.12.3 by Biping Meng
std::list sessions_waiting_for_io changed into std::set for semantical reasons. We would rather prefer unordered_set, and this should be applied when c++09 comes.
347
    /* pop the first session off the queue */
916.1.3 by Padraig O'Sullivan
Replaced the custom list implementation that is used in the pool of threads
348
    session= sessions_need_processing.front();
1089.12.1 by Biping Meng
std::list changed into std::queue
349
    sessions_need_processing.pop();
1152.1.7 by Brian Aker
Fix for shadow issues.
350
    session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
351
352
    (void) pthread_mutex_unlock(&LOCK_event_loop);
353
354
    /* now we process the connection (session) */
355
356
    /* set up the session<->thread links. */
357
    session->thread_stack= (char*) &session;
358
1152.1.7 by Brian Aker
Fix for shadow issues.
359
    if (sched->thread_attach())
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
360
    {
361
      libevent_connection_close(session);
362
      continue;
363
    }
364
365
    /* is the connection logged in yet? */
1152.1.7 by Brian Aker
Fix for shadow issues.
366
    if (!sched->logged_in)
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
367
    {
971.3.64 by Eric Day
Cleaned up Scheduler plugin, moved more code to the schedular plugins, reworked some functions to be methods in Session, removed some dead code.
368
      if (session->authenticate())
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
369
      {
370
        /* Failed to log in */
371
        libevent_connection_close(session);
372
        continue;
373
      }
374
      else
375
      {
376
        /* login successful */
1152.1.7 by Brian Aker
Fix for shadow issues.
377
        sched->logged_in= true;
934.2.6 by Jay Pipes
This changeset removes a few more C functions from sql_connect.cc/connect.h
378
        session->prepareForQueries();
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
379
        if (!libevent_needs_immediate_processing(session))
380
          continue; /* New connection is now waiting for data in libevent*/
381
      }
382
    }
383
384
    do
385
    {
386
      /* Process a query */
934.2.8 by Jay Pipes
Refactors the do_command() function out of the sql_parse.cc stuff and implements it as a member method, executeStatement() on the Session object.
387
      if (! session->executeStatement())
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
388
      {
389
        libevent_connection_close(session);
390
        break;
391
      }
392
    } while (libevent_needs_immediate_processing(session));
929.1.1 by Brian Aker
Push thread count out to the scheduler.
393
394
    if (kill_pool_threads) /* the flag that we should die has been set */
395
      goto thread_exit;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
396
  }
397
398
thread_exit:
399
  (void) pthread_mutex_lock(&LOCK_thread_count);
929.1.1 by Brian Aker
Push thread count out to the scheduler.
400
  created_threads--;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
401
  pthread_cond_broadcast(&COND_thread_count);
402
  (void) pthread_mutex_unlock(&LOCK_thread_count);
1280.1.10 by Monty Taylor
Put everything in drizzled into drizzled namespace.
403
  internal::my_thread_end();
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
404
  pthread_exit(0);
929.1.1 by Brian Aker
Push thread count out to the scheduler.
405
406
  return NULL;                               /* purify: deadcode */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
407
}
408
409
1089.12.4 by Biping Meng
Several refinements on comments
410
/**
411
 * @brief
412
 *  Checks if a session needs immediate processing
413
 *
1089.12.10 by Biping Meng
Some refinements on comments
414
 * @retval true the session needs immediate processing 
415
 * @retval false if not, and is detached from the thread waiting for another
416
 * adding. The naming of the function is misleading in this case; it
417
 * actually does more than just checking if immediate processing is needed.
1089.12.4 by Biping Meng
Several refinements on comments
418
 */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
419
static bool libevent_needs_immediate_processing(Session *session)
420
{
1152.1.7 by Brian Aker
Fix for shadow issues.
421
  session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
422
423
  if (libevent_should_close_connection(session))
424
  {
425
    libevent_connection_close(session);
426
    return false;
427
  }
1089.12.10 by Biping Meng
Some refinements on comments
428
  /*
429
   If more data in the socket buffer, return true to process another command.
430
  
431
   Note: we cannot add for event processing because the whole request
432
   might already be buffered and we wouldn't receive an event. This is
433
   indeed the root of the reason of low performace. Need to be changed
434
   when nonblocking Protocol is finished.
435
  */
971.6.1 by Eric Day
Renamed Protocol to Client, cleaned up some unnecessary methods along the way.
436
  if (session->client->haveMoreData())
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
437
    return true;
438
1152.1.7 by Brian Aker
Fix for shadow issues.
439
  sched->thread_detach();
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
440
  libevent_session_add(session);
441
442
  return false;
443
}
444
445
1089.12.4 by Biping Meng
Several refinements on comments
446
/**
447
 * @brief 
448
 *  Adds a Session to queued for libevent processing.
449
 * 
450
 * @details
451
 *  This call does not actually register the event with libevent.
452
 *  Instead, it places the Session onto a queue and signals libevent by writing
453
 *  a byte into session_add_pipe, which will cause our libevent_add_session_callback to
454
 *  be invoked which will find the Session on the queue and add it to libevent.
455
 */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
456
void libevent_session_add(Session* session)
457
{
1152.1.7 by Brian Aker
Fix for shadow issues.
458
  session_scheduler *sched= (session_scheduler *)session->scheduler_arg;
459
  assert(sched);
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
460
  PoolOfThreadsScheduler *pot_scheduler=
461
    static_cast<PoolOfThreadsScheduler *>(session->scheduler);
462
  pot_scheduler->sessionAddToQueue(sched);
463
}
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
464
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
465
void PoolOfThreadsScheduler::sessionAddToQueue(session_scheduler *sched)
466
{
467
  char c= 0;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
468
  pthread_mutex_lock(&LOCK_session_add);
1089.12.12 by Biping Meng
Some tiny cleanups
469
  if (sessions_need_adding.empty())
1089.12.8 by Biping Meng
If libevent_add_session is called when some thread is executing libevent_add_session_callback, newly connected sessions are being processed in *this* event loop. But new characters writen into the event pipe will not be cleared in this batch. So we choose to clear the pending events at the end of libevent_add_session routine.
470
  {
471
    /* notify libevent */
472
    size_t written= write(session_add_pipe[1], &c, sizeof(c));
1089.12.12 by Biping Meng
Some tiny cleanups
473
    assert(written == sizeof(c));
1089.12.8 by Biping Meng
If libevent_add_session is called when some thread is executing libevent_add_session_callback, newly connected sessions are being processed in *this* event loop. But new characters writen into the event pipe will not be cleared in this batch. So we choose to clear the pending events at the end of libevent_add_session routine.
474
  }
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
475
  /* queue for libevent */
1152.1.7 by Brian Aker
Fix for shadow issues.
476
  sessions_need_adding.push(sched->session);
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
477
  pthread_mutex_unlock(&LOCK_session_add);
478
}
479
480
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
481
PoolOfThreadsScheduler::PoolOfThreadsScheduler(const char *name_arg)
482
  : Scheduler(name_arg), sessions_need_adding(), sessions_to_be_killed(),
483
    sessions_need_processing(), sessions_waiting_for_io()
484
{
485
  struct sched_param tmp_sched_param;
486
487
  memset(&tmp_sched_param, 0, sizeof(struct sched_param));
488
  /* Setup attribute parameter for session threads. */
489
  (void) pthread_attr_init(&attr);
490
  (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
491
  pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
492
493
  tmp_sched_param.sched_priority= WAIT_PRIOR;
494
  (void) pthread_attr_setschedparam(&attr, &tmp_sched_param);
495
496
  pthread_mutex_init(&LOCK_session_add, NULL);
497
  pthread_mutex_init(&LOCK_session_kill, NULL);
498
  pthread_mutex_init(&LOCK_event_loop, NULL);
499
500
}
501
502
1152.1.5 by Brian Aker
Remove Factory/make scheduler work like everything else.
503
PoolOfThreadsScheduler::~PoolOfThreadsScheduler()
504
{
505
  (void) pthread_mutex_lock(&LOCK_thread_count);
506
507
  kill_pool_threads= true;
508
  while (created_threads)
509
  {
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
510
    /*
511
     * Wake up the event loop
512
     */
1152.1.5 by Brian Aker
Remove Factory/make scheduler work like everything else.
513
    char c= 0;
514
    size_t written= write(session_add_pipe[1], &c, sizeof(c));
515
    assert(written == sizeof(c));
516
517
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
518
  }
519
  (void) pthread_mutex_unlock(&LOCK_thread_count);
520
521
  event_del(&session_add_event);
522
  close(session_add_pipe[0]);
523
  close(session_add_pipe[1]);
524
  event_del(&session_kill_event);
525
  close(session_kill_pipe[0]);
526
  close(session_kill_pipe[1]);
527
528
  (void) pthread_mutex_destroy(&LOCK_event_loop);
529
  (void) pthread_mutex_destroy(&LOCK_session_add);
530
  (void) pthread_mutex_destroy(&LOCK_session_kill);
531
  (void) pthread_attr_destroy(&attr);
532
}
533
534
535
bool PoolOfThreadsScheduler::addSession(Session *session)
536
{
537
  assert(session->scheduler_arg == NULL);
1152.1.7 by Brian Aker
Fix for shadow issues.
538
  session_scheduler *sched= new session_scheduler(session);
1152.1.5 by Brian Aker
Remove Factory/make scheduler work like everything else.
539
1152.1.7 by Brian Aker
Fix for shadow issues.
540
  if (sched == NULL)
1152.1.5 by Brian Aker
Remove Factory/make scheduler work like everything else.
541
    return true;
542
1152.1.7 by Brian Aker
Fix for shadow issues.
543
  session->scheduler_arg= (void *)sched;
1152.1.5 by Brian Aker
Remove Factory/make scheduler work like everything else.
544
545
  libevent_session_add(session);
546
547
  return false;
548
}
549
550
551
void PoolOfThreadsScheduler::killSession(Session *session)
552
{
553
  char c= 0;
554
555
  pthread_mutex_lock(&LOCK_session_kill);
556
557
  if (sessions_to_be_killed.empty())
558
  {
559
    /* 
560
      Notify libevent with the killing event if this's the first killing
561
      notification of the batch
562
    */
563
    size_t written= write(session_kill_pipe[1], &c, sizeof(c));
564
    assert(written == sizeof(c));
565
  }
566
567
  /*
568
    Push into the sessions_to_be_killed queue
569
  */
570
  sessions_to_be_killed.push(session);
571
  pthread_mutex_unlock(&LOCK_session_kill);
572
}
573
574
575
bool PoolOfThreadsScheduler::libevent_init(void)
576
{
577
  uint32_t x;
578
579
  event_init();
580
581
582
  /* Set up the pipe used to add new sessions to the event pool */
583
  if (init_pipe(session_add_pipe))
584
  {
585
    errmsg_printf(ERRMSG_LVL_ERROR,
586
                  _("init_pipe(session_add_pipe) error in libevent_init\n"));
587
    return true;
588
  }
589
  /* Set up the pipe used to kill sessions in the event queue */
590
  if (init_pipe(session_kill_pipe))
591
  {
592
    errmsg_printf(ERRMSG_LVL_ERROR,
593
                  _("init_pipe(session_kill_pipe) error in libevent_init\n"));
594
    close(session_add_pipe[0]);
595
    close(session_add_pipe[1]);
596
    return true;
597
  }
598
  event_set(&session_add_event, session_add_pipe[0], EV_READ|EV_PERSIST,
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
599
            libevent_add_session_callback, this);
1152.1.5 by Brian Aker
Remove Factory/make scheduler work like everything else.
600
  event_set(&session_kill_event, session_kill_pipe[0], EV_READ|EV_PERSIST,
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
601
            libevent_kill_session_callback, this);
1152.1.5 by Brian Aker
Remove Factory/make scheduler work like everything else.
602
603
  if (event_add(&session_add_event, NULL) || event_add(&session_kill_event, NULL))
604
  {
605
    errmsg_printf(ERRMSG_LVL_ERROR, _("session_add_event event_add error in libevent_init\n"));
606
    return true;
607
608
  }
609
  /* Set up the thread pool */
610
  pthread_mutex_lock(&LOCK_thread_count);
611
612
  for (x= 0; x < size; x++)
613
  {
614
    pthread_t thread;
615
    int error;
1228.1.2 by Monty Taylor
Moved STL containers into the PoolOfThreadsScheduler class.
616
    if ((error= pthread_create(&thread, &attr, libevent_thread_proc, this)))
1152.1.5 by Brian Aker
Remove Factory/make scheduler work like everything else.
617
    {
618
      errmsg_printf(ERRMSG_LVL_ERROR, _("Can't create completion port thread (error %d)"),
619
                    error);
620
      pthread_mutex_unlock(&LOCK_thread_count);
621
      return true;
622
    }
623
  }
624
625
  /* Wait until all threads are created */
626
  while (created_threads != size)
627
    pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
628
  pthread_mutex_unlock(&LOCK_thread_count);
629
630
  return false;
631
}
960.1.1 by Monty Taylor
First pass at scheduler plugin.
632
1089.12.10 by Biping Meng
Some refinements on comments
633
634
/**
635
 * @brief
636
 *  Called to initialize the pool of threads scheduler plugin
637
 * 
638
 * @param[in] registry holding the record of the plugins
639
 */
1110.1.5 by Monty Taylor
Renamed PluginRegistry to plugin::Registry.
640
static int init(drizzled::plugin::Registry &registry)
960.1.1 by Monty Taylor
First pass at scheduler plugin.
641
{
642
  assert(size != 0);
960.1.5 by Monty Taylor
Cleaned up pool_of_threads scheduler just a bit.
643
1152.1.5 by Brian Aker
Remove Factory/make scheduler work like everything else.
644
  scheduler= new PoolOfThreadsScheduler("pool_of_threads");
645
  registry.add(scheduler);
960.1.5 by Monty Taylor
Cleaned up pool_of_threads scheduler just a bit.
646
647
  return 0;
960.1.1 by Monty Taylor
First pass at scheduler plugin.
648
}
649
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
650
/**
1089.12.4 by Biping Meng
Several refinements on comments
651
 * @brief
652
 *  Waits until all pool threads have been deleted for clean shutdown
653
 */
1110.1.5 by Monty Taylor
Renamed PluginRegistry to plugin::Registry.
654
static int deinit(drizzled::plugin::Registry &registry)
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
655
{
1152.1.5 by Brian Aker
Remove Factory/make scheduler work like everything else.
656
  registry.remove(scheduler);
657
  delete scheduler;
658
874 by Brian Aker
Refactor out function indirection in pool_of_threads.
659
  return 0;
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
660
}
661
1089.12.10 by Biping Meng
Some refinements on comments
662
/*
663
 The defaults here were picked based on what I see (aka Brian). They should
664
 be vetted across a larger audience.
665
*/
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
666
static DRIZZLE_SYSVAR_UINT(size, size,
667
                           PLUGIN_VAR_RQCMDARG,
668
                           N_("Size of Pool."),
669
                           NULL, NULL, 8, 1, 1024, 0);
670
1280.1.10 by Monty Taylor
Put everything in drizzled into drizzled namespace.
671
static drizzle_sys_var* sys_variables[]= {
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
672
  DRIZZLE_SYSVAR(size),
673
  NULL,
674
};
675
1228.1.5 by Monty Taylor
Merged in some naming things.
676
DRIZZLE_DECLARE_PLUGIN
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
677
{
1241.10.2 by Monty Taylor
Added support for embedding the drizzle version number in the plugin file.
678
  DRIZZLE_VERSION_ID,
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
679
  "pool_of_threads",
680
  "0.1",
681
  "Brian Aker",
682
  "Pool of Threads Scheduler",
683
  PLUGIN_LICENSE_GPL,
684
  init, /* Plugin Init */
685
  deinit, /* Plugin Deinit */
686
  NULL,   /* status variables */
1280.1.10 by Monty Taylor
Put everything in drizzled into drizzled namespace.
687
  sys_variables,   /* system variables */
803 by Brian Aker
Refactored all current scheduler to be behind scheduler plugin api.
688
  NULL    /* config options */
689
}
1228.1.5 by Monty Taylor
Merged in some naming things.
690
DRIZZLE_DECLARE_PLUGIN_END;