19
19
#include <boost/program_options.hpp>
20
20
#include <drizzled/module/option_map.h>
21
21
#include <drizzled/errmsg_print.h>
22
#include "drizzled/session.h"
23
#include "drizzled/session/cache.h"
25
#include <boost/thread.hpp>
26
#include <boost/bind.hpp>
28
23
namespace po= boost::program_options;
29
24
using namespace std;
30
25
using namespace drizzled;
32
27
/* Configuration variables. */
33
typedef constrained_check<uint32_t, 4096, 1> max_threads_constraint;
34
static max_threads_constraint max_threads;
28
static uint32_t max_threads;
31
static MultiThreadScheduler *scheduler= NULL;
38
35
extern size_t my_thread_stack_size;
41
namespace multi_thread {
43
void MultiThreadScheduler::runSession(drizzled::session_id_t id)
46
boost::this_thread::disable_interruption disable_by_default;
47
Session::shared_ptr session(session::Cache::singleton().find(id));
51
std::cerr << "Session killed before thread could execute\n";
54
session->pushInterrupt(&disable_by_default);
56
if (drizzled::internal::my_thread_init())
58
session->disconnect(drizzled::ER_OUT_OF_RESOURCES, true);
59
session->status_var.aborted_connects++;
60
killSessionNow(session);
62
boost::this_thread::at_thread_exit(&internal::my_thread_end);
64
session->thread_stack= (char*) &stack_dummy;
66
killSessionNow(session);
67
// @todo remove hard spin by disconnection the session first from the
69
while (not session.unique()) {}
72
void MultiThreadScheduler::setStackSize()
76
(void) pthread_attr_init(&attr);
78
/* Get the thread stack size that the OS will use and make sure
79
that we update our global variable. */
80
int err= pthread_attr_getstacksize(&attr, &my_thread_stack_size);
81
pthread_attr_destroy(&attr);
85
errmsg_printf(ERRMSG_LVL_ERROR, _("Unable to get thread stack size\n"));
86
my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
89
if (my_thread_stack_size == 0)
91
my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
39
* Function to be run as a thread for each session.
43
extern "C" pthread_handler_t session_thread(void *arg);
48
extern "C" pthread_handler_t session_thread(void *arg)
50
Session *session= static_cast<Session*>(arg);
51
MultiThreadScheduler *sched= static_cast<MultiThreadScheduler*>(session->scheduler);
52
sched->runSession(session);
58
bool MultiThreadScheduler::addSession(Session *session)
60
if (thread_count >= max_threads)
95
65
* Solaris will return zero for the stack size in a call to
103
73
if (my_thread_stack_size == 0)
105
74
my_thread_stack_size= 2 * 1024 * 1024;
110
bool MultiThreadScheduler::addSession(Session::shared_ptr &session)
112
if (thread_count >= max_threads)
77
/* Thread stack size of zero means just use the OS default */
78
if (my_thread_stack_size != 0)
80
int err= pthread_attr_setstacksize(&attr, my_thread_stack_size);
84
errmsg_printf(ERRMSG_LVL_ERROR,
85
_("Unable to set thread stack size to %" PRId64 "\n"),
86
static_cast<uint64_t>(my_thread_stack_size));
92
/* Get the thread stack size that the OS will use and make sure
93
that we update our global variable. */
94
int err= pthread_attr_getstacksize(&attr, &my_thread_stack_size);
98
errmsg_printf(ERRMSG_LVL_ERROR, _("Unable to get thread stack size\n"));
115
103
thread_count.increment();
117
session->getThread().reset(new boost::thread((boost::bind(&MultiThreadScheduler::runSession, this, session->getSessionId()))));
119
if (not session->getThread())
121
thread_count.decrement();
125
if (not session->getThread()->joinable())
105
if (pthread_create(&session->real_id, &attr, session_thread,
106
static_cast<void*>(session)))
127
108
thread_count.decrement();
135
void MultiThreadScheduler::killSession(Session *session)
137
boost_thread_shared_ptr thread(session->getThread());
145
void MultiThreadScheduler::killSessionNow(Session::shared_ptr &session)
147
killSession(session.get());
116
void MultiThreadScheduler::killSessionNow(Session *session)
148
118
/* Locks LOCK_thread_count and deletes session */
149
119
Session::unlink(session);
150
120
thread_count.decrement();
121
internal::my_thread_end();
123
/* We should never reach this point. */
153
126
MultiThreadScheduler::~MultiThreadScheduler()
155
boost::mutex::scoped_lock scopedLock(drizzled::session::Cache::singleton().mutex());
128
LOCK_thread_count.lock();
156
129
while (thread_count)
158
COND_thread_count.wait(scopedLock);
131
pthread_cond_wait(COND_thread_count.native_handle(), LOCK_thread_count.native_handle());
134
LOCK_thread_count.unlock();
135
(void) pthread_attr_destroy(&attr);
162
} // multi_thread namespace
165
139
static int init(drizzled::module::Context &context)
168
context.add(new multi_thread::MultiThreadScheduler("multi_thread"));
142
const module::option_map &vm= context.getOptions();
143
if (vm.count("max-threads"))
145
if (max_threads > 4096 || max_threads < 1)
147
errmsg_printf(ERRMSG_LVL_ERROR, _("Invalid value for max-threads\n"));
152
scheduler= new MultiThreadScheduler("multi_thread");
153
context.add(scheduler);
158
static DRIZZLE_SYSVAR_UINT(max_threads, max_threads,
160
N_("Maximum number of user threads available."),
161
NULL, NULL, 2048, 1, 4096, 0);
173
163
static void init_options(drizzled::module::option_context &context)
175
165
context("max-threads",
176
po::value<max_threads_constraint>(&max_threads)->default_value(2048),
166
po::value<uint32_t>(&max_threads)->default_value(2048),
177
167
N_("Maximum number of user threads available."));
170
static drizzle_sys_var* sys_variables[]= {
171
DRIZZLE_SYSVAR(max_threads),
180
175
DRIZZLE_DECLARE_PLUGIN
182
177
DRIZZLE_VERSION_ID,