13
13
along with this program; if not, write to the Free Software
14
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
20
#include <drizzled/pthread_globals.h>
21
#include <drizzled/module/option_map.h>
22
#include <drizzled/errmsg_print.h>
16
#include <drizzled/server_includes.h>
17
#include <drizzled/atomics.h>
18
#include <drizzled/gettext.h>
19
#include <drizzled/error.h>
20
#include <drizzled/plugin/scheduler.h>
21
#include <drizzled/connect.h>
22
#include <drizzled/sql_parse.h>
23
23
#include <drizzled/session.h>
24
#include <drizzled/session/cache.h>
25
#include <drizzled/abort_exception.h>
26
#include <drizzled/transaction_services.h>
27
#include <drizzled/gettext.h>
29
#include <boost/thread.hpp>
30
#include <boost/bind.hpp>
31
#include <boost/program_options.hpp>
33
#include "multi_thread.h"
35
namespace po= boost::program_options;
24
#include <drizzled/connect.h>
36
27
using namespace std;
37
using namespace drizzled;
39
/* Configuration variables. */
40
typedef constrained_check<uint32_t, 4096, 1> max_threads_constraint;
41
static max_threads_constraint max_threads;
45
extern size_t my_thread_stack_size;
48
namespace multi_thread {
50
void MultiThreadScheduler::runSession(drizzled::session_id_t id)
53
boost::this_thread::disable_interruption disable_by_default;
55
Session::shared_ptr session(session::Cache::singleton().find(id));
62
std::cerr << _("Session killed before thread could execute") << endl;
65
session->pushInterrupt(&disable_by_default);
67
if (drizzled::internal::my_thread_init())
69
session->disconnect(drizzled::ER_OUT_OF_RESOURCES);
70
session->status_var.aborted_connects++;
74
boost::this_thread::at_thread_exit(&internal::my_thread_end);
76
session->thread_stack= (char*) &stack_dummy;
80
killSessionNow(session);
82
catch (abort_exception& ex)
84
cout << _("Drizzle has receieved an abort event.") << endl;
85
cout << _("In Function: ") << *::boost::get_error_info<boost::throw_function>(ex) << endl;
86
cout << _("In File: ") << *::boost::get_error_info<boost::throw_file>(ex) << endl;
87
cout << _("On Line: ") << *::boost::get_error_info<boost::throw_line>(ex) << endl;
89
TransactionServices::singleton().sendShutdownEvent(*session.get());
91
// @todo remove hard spin by disconnection the session first from the
93
while (not session.unique()) {}
96
void MultiThreadScheduler::setStackSize()
100
(void) pthread_attr_init(&attr);
102
/* Get the thread stack size that the OS will use and make sure
103
that we update our global variable. */
104
int err= pthread_attr_getstacksize(&attr, &my_thread_stack_size);
105
pthread_attr_destroy(&attr);
109
errmsg_printf(error::ERROR, _("Unable to get thread stack size"));
110
my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
113
if (my_thread_stack_size == 0)
115
my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
29
static uint32_t max_threads;
31
class Multi_thread_scheduler: public Scheduler
33
drizzled::atomic<uint32_t> thread_count;
34
pthread_attr_t multi_thread_attrib;
37
Multi_thread_scheduler(uint32_t threads): Scheduler(threads)
40
/* Parameter for threads created for connections */
41
(void) pthread_attr_init(&multi_thread_attrib);
42
(void) pthread_attr_setdetachstate(&multi_thread_attrib,
43
PTHREAD_CREATE_DETACHED);
44
pthread_attr_setscope(&multi_thread_attrib, PTHREAD_SCOPE_SYSTEM);
46
struct sched_param tmp_sched_param;
48
memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
49
tmp_sched_param.sched_priority= WAIT_PRIOR;
50
(void)pthread_attr_setschedparam(&multi_thread_attrib, &tmp_sched_param);
54
~Multi_thread_scheduler()
56
(void) pthread_mutex_lock(&LOCK_thread_count);
59
pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
61
(void) pthread_mutex_unlock(&LOCK_thread_count);
63
pthread_attr_destroy(&multi_thread_attrib);
66
virtual bool add_connection(Session *session)
72
if ((error= pthread_create(&session->real_id, &multi_thread_attrib, handle_one_connection, static_cast<void*>(session))))
119
* Solaris will return zero for the stack size in a call to
120
* pthread_attr_getstacksize() to indicate that the OS default stack
121
* size is used. We need an actual value in my_thread_stack_size so that
122
* check_stack_overrun() will work. The Solaris man page for the
123
* pthread_attr_getstacksize() function says that 2M is used for 64-bit
124
* processes. We'll explicitly set it here to make sure that is what
127
if (my_thread_stack_size == 0)
129
my_thread_stack_size= 2 * 1024 * 1024;
134
bool MultiThreadScheduler::addSession(Session::shared_ptr &session)
136
if (thread_count >= max_threads)
139
thread_count.increment();
142
session->getThread().reset(new boost::thread((boost::bind(&MultiThreadScheduler::runSession, this, session->getSessionId()))));
144
catch (std::exception&)
146
thread_count.decrement();
150
if (not session->getThread())
152
thread_count.decrement();
156
if (not session->getThread()->joinable())
158
thread_count.decrement();
166
void MultiThreadScheduler::killSession(Session *session)
168
boost_thread_shared_ptr thread(session->getThread());
176
void MultiThreadScheduler::killSessionNow(Session::shared_ptr &session)
178
killSession(session.get());
180
session->disconnect();
182
/* Locks LOCK_thread_count and deletes session */
183
Session::unlink(session);
184
thread_count.decrement();
187
MultiThreadScheduler::~MultiThreadScheduler()
189
boost::mutex::scoped_lock scopedLock(drizzled::session::Cache::singleton().mutex());
192
COND_thread_count.wait(scopedLock);
196
} // multi_thread namespace
199
static int init(drizzled::module::Context &context)
202
context.add(new multi_thread::MultiThreadScheduler("multi_thread"));
207
static void init_options(drizzled::module::option_context &context)
209
context("max-threads",
210
po::value<max_threads_constraint>(&max_threads)->default_value(2048),
211
_("Maximum number of user threads available."));
214
DRIZZLE_DECLARE_PLUGIN
80
End connection, in case when we are using 'no-threads'
83
virtual bool end_thread(Session *session, bool)
85
unlink_session(session); /* locks LOCK_thread_count and deletes session */
91
return true; // We should never reach this point
94
virtual uint32_t count(void)
100
class MultiThreadFactory : public SchedulerFactory
103
MultiThreadFactory() : SchedulerFactory("multi_thread")
105
addAlias("multi-thread");
108
~MultiThreadFactory() { if (scheduler != NULL) delete scheduler; }
109
Scheduler *operator() ()
111
if (scheduler == NULL)
112
scheduler= new Multi_thread_scheduler(max_threads);
116
static MultiThreadFactory *factory= NULL;
118
static int init(PluginRegistry ®istry)
120
factory= new MultiThreadFactory();
121
registry.add(factory);
125
static int deinit(PluginRegistry ®istry)
129
registry.remove(factory);
135
static DRIZZLE_SYSVAR_UINT(max_threads, max_threads,
137
N_("Maximum number of user threads available."),
138
NULL, NULL, 2048, 1, 4048, 0);
140
static struct st_mysql_sys_var* system_variables[]= {
141
DRIZZLE_SYSVAR(max_threads),
145
drizzle_declare_plugin(multi_thread)
220
150
"One Thread Per Session Scheduler",
221
151
PLUGIN_LICENSE_GPL,
222
152
init, /* Plugin Init */
224
init_options /* config options */
153
deinit, /* Plugin Deinit */
154
NULL, /* status variables */
155
system_variables, /* system variables */
156
NULL /* config options */
226
DRIZZLE_DECLARE_PLUGIN_END;
158
drizzle_declare_plugin_end;