~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/multi_thread/multi_thread.cc

  • Committer: Monty Taylor
  • Date: 2010-09-28 07:45:44 UTC
  • mto: (1799.1.2 build)
  • mto: This revision was merged to the branch mainline in revision 1800.
  • Revision ID: mordred@inaugust.com-20100928074544-s3ujnv6s8wro74l2
Added BSD copying file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
#include <boost/program_options.hpp>
20
20
#include <drizzled/module/option_map.h>
21
21
#include <drizzled/errmsg_print.h>
22
 
#include "drizzled/session.h"
23
 
#include "drizzled/session/cache.h"
24
22
 
25
23
#include <boost/thread.hpp>
26
24
#include <boost/bind.hpp>
30
28
using namespace drizzled;
31
29
 
32
30
/* Configuration variables. */
33
 
typedef constrained_check<uint32_t, 4096, 1> max_threads_constraint;
34
 
static max_threads_constraint max_threads;
 
31
static uint32_t max_threads;
 
32
 
 
33
/* Global's (TBR) */
 
34
static MultiThreadScheduler *scheduler= NULL;
35
35
 
36
36
namespace drizzled
37
37
{
38
38
  extern size_t my_thread_stack_size;
39
39
}
40
40
 
41
 
namespace multi_thread {
42
 
 
43
 
void MultiThreadScheduler::runSession(drizzled::session_id_t id)
 
41
void MultiThreadScheduler::runSession(drizzled::Session *session)
44
42
{
45
 
  char stack_dummy;
46
 
  boost::this_thread::disable_interruption disable_by_default;
47
 
  Session::shared_ptr session(session::Cache::singleton().find(id));
48
 
 
49
 
  if (not session)
50
 
  {
51
 
    std::cerr << "Session killed before thread could execute\n";
52
 
    return;
53
 
  }
54
 
  session->pushInterrupt(&disable_by_default);
55
 
 
56
43
  if (drizzled::internal::my_thread_init())
57
44
  {
58
45
    session->disconnect(drizzled::ER_OUT_OF_RESOURCES, true);
61
48
  }
62
49
  boost::this_thread::at_thread_exit(&internal::my_thread_end);
63
50
 
64
 
  session->thread_stack= (char*) &stack_dummy;
 
51
  session->thread_stack= (char*) &session;
65
52
  session->run();
66
53
  killSessionNow(session);
67
 
  // @todo remove hard spin by disconnection the session first from the
68
 
  // thread.
69
 
  while (not session.unique()) {}
70
54
}
71
55
 
72
56
void MultiThreadScheduler::setStackSize()
107
91
#endif
108
92
}
109
93
 
110
 
bool MultiThreadScheduler::addSession(Session::shared_ptr &session)
 
94
bool MultiThreadScheduler::addSession(Session *session)
111
95
{
112
96
  if (thread_count >= max_threads)
113
97
    return true;
114
98
 
115
99
  thread_count.increment();
116
100
 
117
 
  session->getThread().reset(new boost::thread((boost::bind(&MultiThreadScheduler::runSession, this, session->getSessionId()))));
118
 
 
119
 
  if (not session->getThread())
120
 
  {
121
 
    thread_count.decrement();
122
 
    return true;
123
 
  }
124
 
 
125
 
  if (not session->getThread()->joinable())
 
101
  boost::thread new_thread(boost::bind(&MultiThreadScheduler::runSession, this, session));
 
102
 
 
103
  if (not new_thread.joinable())
126
104
  {
127
105
    thread_count.decrement();
128
106
    return true;
132
110
}
133
111
 
134
112
 
135
 
void MultiThreadScheduler::killSession(Session *session)
136
 
{
137
 
  boost_thread_shared_ptr thread(session->getThread());
138
 
 
139
 
  if (thread)
140
 
  {
141
 
    thread->interrupt();
142
 
  }
143
 
}
144
 
 
145
 
void MultiThreadScheduler::killSessionNow(Session::shared_ptr &session)
146
 
{
147
 
  killSession(session.get());
 
113
void MultiThreadScheduler::killSessionNow(Session *session)
 
114
{
148
115
  /* Locks LOCK_thread_count and deletes session */
149
116
  Session::unlink(session);
150
117
  thread_count.decrement();
152
119
 
153
120
MultiThreadScheduler::~MultiThreadScheduler()
154
121
{
155
 
  boost::mutex::scoped_lock scopedLock(drizzled::session::Cache::singleton().mutex());
 
122
  boost::mutex::scoped_lock scopedLock(LOCK_thread_count);
156
123
  while (thread_count)
157
124
  {
158
125
    COND_thread_count.wait(scopedLock);
159
126
  }
160
127
}
161
128
 
162
 
} // multi_thread namespace
163
 
 
164
129
  
165
130
static int init(drizzled::module::Context &context)
166
131
{
167
132
  
168
 
  context.add(new multi_thread::MultiThreadScheduler("multi_thread"));
 
133
  const module::option_map &vm= context.getOptions();
 
134
  if (vm.count("max-threads"))
 
135
  {
 
136
    if (max_threads > 4096 || max_threads < 1)
 
137
    {
 
138
      errmsg_printf(ERRMSG_LVL_ERROR, _("Invalid value for max-threads\n"));
 
139
      exit(-1);
 
140
    }
 
141
  }
 
142
 
 
143
  scheduler= new MultiThreadScheduler("multi_thread");
 
144
  context.add(scheduler);
169
145
 
170
146
  return 0;
171
147
}
172
148
 
 
149
static DRIZZLE_SYSVAR_UINT(max_threads, max_threads,
 
150
                           PLUGIN_VAR_RQCMDARG,
 
151
                           N_("Maximum number of user threads available."),
 
152
                           NULL, NULL, 2048, 1, 4096, 0);
 
153
 
173
154
static void init_options(drizzled::module::option_context &context)
174
155
{
175
156
  context("max-threads",
176
 
          po::value<max_threads_constraint>(&max_threads)->default_value(2048),
 
157
          po::value<uint32_t>(&max_threads)->default_value(2048),
177
158
          N_("Maximum number of user threads available."));
178
159
}
179
160
 
 
161
static drizzle_sys_var* sys_variables[]= {
 
162
  DRIZZLE_SYSVAR(max_threads),
 
163
  NULL
 
164
};
 
165
 
180
166
DRIZZLE_DECLARE_PLUGIN
181
167
{
182
168
  DRIZZLE_VERSION_ID,
186
172
  "One Thread Per Session Scheduler",
187
173
  PLUGIN_LICENSE_GPL,
188
174
  init, /* Plugin Init */
189
 
  NULL,   /* system variables */
 
175
  sys_variables,   /* system variables */
190
176
  init_options    /* config options */
191
177
}
192
178
DRIZZLE_DECLARE_PLUGIN_END;