~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/multi_thread/multi_thread.cc

  • Committer: Monty Taylor
  • Date: 2009-03-22 07:55:08 UTC
  • mto: (960.5.2 mordred)
  • mto: This revision was merged to the branch mainline in revision 961.
  • Revision ID: mordred@inaugust.com-20090322075508-1h34cksq2knhaxc3
Removed global.h from a header.

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
   along with this program; if not, write to the Free Software
14
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
 
#include "config.h"
17
 
#include <plugin/multi_thread/multi_thread.h>
18
 
#include "drizzled/pthread_globals.h"
19
 
#include <boost/program_options.hpp>
20
 
#include <drizzled/module/option_map.h>
21
 
#include <drizzled/errmsg_print.h>
22
 
#include "drizzled/session.h"
23
 
#include "drizzled/session/cache.h"
24
 
 
25
 
#include <boost/thread.hpp>
26
 
#include <boost/bind.hpp>
27
 
 
28
 
namespace po= boost::program_options;
 
16
#include <drizzled/server_includes.h>
 
17
#include <drizzled/atomics.h>
 
18
#include <drizzled/gettext.h>
 
19
#include <drizzled/error.h>
 
20
#include <drizzled/plugin_scheduling.h>
 
21
#include <drizzled/connect.h>
 
22
#include <drizzled/sql_parse.h>
 
23
#include <drizzled/session.h>
 
24
#include <drizzled/connect.h>
 
25
#include <string>
 
26
 
29
27
using namespace std;
30
 
using namespace drizzled;
31
 
 
32
 
/* Configuration variables. */
33
 
typedef constrained_check<uint32_t, 4096, 1> max_threads_constraint;
34
 
static max_threads_constraint max_threads;
35
 
 
36
 
namespace drizzled
37
 
{
38
 
  extern size_t my_thread_stack_size;
39
 
}
40
 
 
41
 
namespace multi_thread {
42
 
 
43
 
void MultiThreadScheduler::runSession(drizzled::session_id_t id)
44
 
{
45
 
  char stack_dummy;
46
 
  boost::this_thread::disable_interruption disable_by_default;
47
 
  Session::shared_ptr session(session::Cache::singleton().find(id));
48
 
 
49
 
  if (not session)
50
 
  {
51
 
    std::cerr << "Session killed before thread could execute\n";
52
 
    return;
53
 
  }
54
 
  session->pushInterrupt(&disable_by_default);
55
 
 
56
 
  if (drizzled::internal::my_thread_init())
57
 
  {
58
 
    session->disconnect(drizzled::ER_OUT_OF_RESOURCES, true);
59
 
    session->status_var.aborted_connects++;
60
 
    killSessionNow(session);
61
 
  }
62
 
  boost::this_thread::at_thread_exit(&internal::my_thread_end);
63
 
 
64
 
  session->thread_stack= (char*) &stack_dummy;
65
 
  session->run();
66
 
  killSessionNow(session);
67
 
  // @todo remove hard spin by disconnection the session first from the
68
 
  // thread.
69
 
  while (not session.unique()) {}
70
 
}
71
 
 
72
 
void MultiThreadScheduler::setStackSize()
73
 
{
74
 
  pthread_attr_t attr;
75
 
 
76
 
  (void) pthread_attr_init(&attr);
77
 
 
78
 
  /* Get the thread stack size that the OS will use and make sure
79
 
    that we update our global variable. */
80
 
  int err= pthread_attr_getstacksize(&attr, &my_thread_stack_size);
81
 
  pthread_attr_destroy(&attr);
82
 
 
83
 
  if (err != 0)
84
 
  {
85
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Unable to get thread stack size\n"));
86
 
    my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
87
 
  }
88
 
 
89
 
  if (my_thread_stack_size == 0)
90
 
  {
91
 
    my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
92
 
  }
93
 
#ifdef __sun
 
28
 
 
29
static uint32_t max_threads;
 
30
 
 
31
class Multi_thread_scheduler: public Scheduler
 
32
{
 
33
  tbb::atomic<uint32_t> thread_count;
 
34
  pthread_attr_t multi_thread_attrib;
 
35
 
 
36
public:
 
37
  Multi_thread_scheduler(uint32_t threads): Scheduler(threads)
 
38
  {
 
39
    thread_count= 0;
 
40
    /* Parameter for threads created for connections */
 
41
    (void) pthread_attr_init(&multi_thread_attrib);
 
42
    (void) pthread_attr_setdetachstate(&multi_thread_attrib,
 
43
                                       PTHREAD_CREATE_DETACHED);
 
44
    pthread_attr_setscope(&multi_thread_attrib, PTHREAD_SCOPE_SYSTEM);
 
45
    {
 
46
      struct sched_param tmp_sched_param;
 
47
  
 
48
      memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
 
49
      tmp_sched_param.sched_priority= WAIT_PRIOR;
 
50
      (void)pthread_attr_setschedparam(&multi_thread_attrib, &tmp_sched_param);
 
51
    }
 
52
  }
 
53
 
 
54
  ~Multi_thread_scheduler()
 
55
  {
 
56
    (void) pthread_mutex_lock(&LOCK_thread_count);
 
57
    while (thread_count)
 
58
    {
 
59
      pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
 
60
    }
 
61
    (void) pthread_mutex_unlock(&LOCK_thread_count);
 
62
    
 
63
    pthread_attr_destroy(&multi_thread_attrib);
 
64
  }
 
65
 
 
66
  virtual bool add_connection(Session *session)
 
67
  {
 
68
    int error;
 
69
  
 
70
    thread_count++;
 
71
  
 
72
    if ((error= pthread_create(&session->real_id, &multi_thread_attrib, handle_one_connection, static_cast<void*>(session))))
 
73
      return true;
 
74
  
 
75
    return false;
 
76
  }
 
77
  
 
78
  
94
79
  /*
95
 
   * Solaris will return zero for the stack size in a call to
96
 
   * pthread_attr_getstacksize() to indicate that the OS default stack
97
 
   * size is used. We need an actual value in my_thread_stack_size so that
98
 
   * check_stack_overrun() will work. The Solaris man page for the
99
 
   * pthread_attr_getstacksize() function says that 2M is used for 64-bit
100
 
   * processes. We'll explicitly set it here to make sure that is what
101
 
   * will be used.
102
 
   */
103
 
  if (my_thread_stack_size == 0)
104
 
  {
105
 
    my_thread_stack_size= 2 * 1024 * 1024;
106
 
  }
107
 
#endif
108
 
}
109
 
 
110
 
bool MultiThreadScheduler::addSession(Session::shared_ptr &session)
111
 
{
112
 
  if (thread_count >= max_threads)
113
 
    return true;
114
 
 
115
 
  thread_count.increment();
116
 
 
117
 
  session->getThread().reset(new boost::thread((boost::bind(&MultiThreadScheduler::runSession, this, session->getSessionId()))));
118
 
 
119
 
  if (not session->getThread())
120
 
  {
121
 
    thread_count.decrement();
122
 
    return true;
123
 
  }
124
 
 
125
 
  if (not session->getThread()->joinable())
126
 
  {
127
 
    thread_count.decrement();
128
 
    return true;
129
 
  }
130
 
 
131
 
  return false;
132
 
}
133
 
 
134
 
 
135
 
void MultiThreadScheduler::killSession(Session *session)
136
 
{
137
 
  boost_thread_shared_ptr thread(session->getThread());
138
 
 
139
 
  if (thread)
140
 
  {
141
 
    thread->interrupt();
142
 
  }
143
 
}
144
 
 
145
 
void MultiThreadScheduler::killSessionNow(Session::shared_ptr &session)
146
 
{
147
 
  killSession(session.get());
148
 
  /* Locks LOCK_thread_count and deletes session */
149
 
  Session::unlink(session);
150
 
  thread_count.decrement();
151
 
}
152
 
 
153
 
MultiThreadScheduler::~MultiThreadScheduler()
154
 
{
155
 
  boost::mutex::scoped_lock scopedLock(drizzled::session::Cache::singleton().mutex());
156
 
  while (thread_count)
157
 
  {
158
 
    COND_thread_count.wait(scopedLock);
159
 
  }
160
 
}
161
 
 
162
 
} // multi_thread namespace
163
 
 
164
 
  
165
 
static int init(drizzled::module::Context &context)
166
 
{
167
 
  
168
 
  context.add(new multi_thread::MultiThreadScheduler("multi_thread"));
169
 
 
170
 
  return 0;
171
 
}
172
 
 
173
 
static void init_options(drizzled::module::option_context &context)
174
 
{
175
 
  context("max-threads",
176
 
          po::value<max_threads_constraint>(&max_threads)->default_value(2048),
177
 
          N_("Maximum number of user threads available."));
178
 
}
179
 
 
180
 
DRIZZLE_DECLARE_PLUGIN
181
 
{
182
 
  DRIZZLE_VERSION_ID,
 
80
    End connection, in case when we are using 'no-threads'
 
81
  */
 
82
  
 
83
  virtual bool end_thread(Session *session, bool)
 
84
  {
 
85
    unlink_session(session);   /* locks LOCK_thread_count and deletes session */
 
86
    thread_count--;
 
87
  
 
88
    my_thread_end();
 
89
    pthread_exit(0);
 
90
  
 
91
    return true; // We should never reach this point
 
92
  }
 
93
  
 
94
  virtual uint32_t count(void)
 
95
  {
 
96
    return thread_count;
 
97
  }
 
98
};
 
99
 
 
100
static int init(void *p)
 
101
{
 
102
  Multi_thread_scheduler** sched= static_cast<Multi_thread_scheduler **>(p);
 
103
 
 
104
  *sched= new Multi_thread_scheduler(max_threads);
 
105
 
 
106
  return 0;
 
107
}
 
108
 
 
109
static int deinit(void *p)
 
110
{
 
111
 
 
112
  Multi_thread_scheduler *sched= static_cast<Multi_thread_scheduler *>(p);
 
113
  delete sched;
 
114
 
 
115
  return 0;
 
116
}
 
117
 
 
118
static DRIZZLE_SYSVAR_UINT(max_threads, max_threads,
 
119
                           PLUGIN_VAR_RQCMDARG,
 
120
                           N_("Maximum number of user threads available."),
 
121
                           NULL, NULL, 2048, 1, 4048, 0);
 
122
 
 
123
static struct st_mysql_sys_var* system_variables[]= {
 
124
  DRIZZLE_SYSVAR(max_threads),
 
125
  NULL
 
126
};
 
127
 
 
128
drizzle_declare_plugin(multi_thread)
 
129
{
 
130
  DRIZZLE_SCHEDULING_PLUGIN,
183
131
  "multi_thread",
184
132
  "0.1",
185
133
  "Brian Aker",
186
134
  "One Thread Per Session Scheduler",
187
135
  PLUGIN_LICENSE_GPL,
188
136
  init, /* Plugin Init */
189
 
  NULL,   /* system variables */
190
 
  init_options    /* config options */
 
137
  deinit, /* Plugin Deinit */
 
138
  NULL,   /* status variables */
 
139
  system_variables,   /* system variables */
 
140
  NULL    /* config options */
191
141
}
192
 
DRIZZLE_DECLARE_PLUGIN_END;
 
142
drizzle_declare_plugin_end;