~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/multi_thread/multi_thread.cc

  • Committer: Andrew Hutchings
  • Date: 2010-10-23 10:43:00 UTC
  • mto: (1874.2.1 merge)
  • mto: This revision was merged to the branch mainline in revision 1876.
  • Revision ID: andrew@linuxjedi.co.uk-20101023104300-90rgyfw2eokcvsp3
Make port 4427 the default for client apps

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
16
#include "config.h"
17
 
 
18
 
#include <iostream>
19
 
 
20
 
#include <drizzled/pthread_globals.h>
 
17
#include <plugin/multi_thread/multi_thread.h>
 
18
#include "drizzled/pthread_globals.h"
 
19
#include <boost/program_options.hpp>
21
20
#include <drizzled/module/option_map.h>
22
21
#include <drizzled/errmsg_print.h>
23
 
#include <drizzled/session.h>
24
 
#include <drizzled/session/cache.h>
25
 
#include <drizzled/abort_exception.h>
26
 
#include <drizzled/transaction_services.h>
27
 
#include <drizzled/gettext.h>
28
22
 
29
23
#include <boost/thread.hpp>
30
24
#include <boost/bind.hpp>
31
 
#include <boost/program_options.hpp>
32
 
 
33
 
#include "multi_thread.h"
34
25
 
35
26
namespace po= boost::program_options;
36
27
using namespace std;
37
28
using namespace drizzled;
38
29
 
39
30
/* Configuration variables. */
40
 
typedef constrained_check<uint32_t, 4096, 1> max_threads_constraint;
41
 
static max_threads_constraint max_threads;
 
31
static uint32_t max_threads;
 
32
 
 
33
/* Global's (TBR) */
 
34
static MultiThreadScheduler *scheduler= NULL;
42
35
 
43
36
namespace drizzled
44
37
{
45
38
  extern size_t my_thread_stack_size;
46
39
}
47
40
 
48
 
namespace multi_thread {
49
 
 
50
 
void MultiThreadScheduler::runSession(drizzled::session_id_t id)
 
41
void MultiThreadScheduler::runSession(drizzled::Session *session)
51
42
{
52
 
  char stack_dummy;
53
 
  boost::this_thread::disable_interruption disable_by_default;
54
 
 
55
 
  Session::shared_ptr session(session::Cache::singleton().find(id));
56
 
 
57
 
  try
 
43
  if (drizzled::internal::my_thread_init())
58
44
  {
59
 
 
60
 
    if (not session)
61
 
    {
62
 
      std::cerr << _("Session killed before thread could execute") << endl;
63
 
      return;
64
 
    }
65
 
    session->pushInterrupt(&disable_by_default);
66
 
 
67
 
    if (drizzled::internal::my_thread_init())
68
 
    {
69
 
      session->disconnect(drizzled::ER_OUT_OF_RESOURCES);
70
 
      session->status_var.aborted_connects++;
71
 
    }
72
 
    else
73
 
    {
74
 
      boost::this_thread::at_thread_exit(&internal::my_thread_end);
75
 
 
76
 
      session->thread_stack= (char*) &stack_dummy;
77
 
      session->run();
78
 
    }
79
 
 
 
45
    session->disconnect(drizzled::ER_OUT_OF_RESOURCES, true);
 
46
    session->status_var.aborted_connects++;
80
47
    killSessionNow(session);
81
48
  }
82
 
  catch (abort_exception& ex)
83
 
  {
84
 
    cout << _("Drizzle has receieved an abort event.") << endl;
85
 
    cout << _("In Function: ") << *::boost::get_error_info<boost::throw_function>(ex) << endl;
86
 
    cout << _("In File: ") << *::boost::get_error_info<boost::throw_file>(ex) << endl;
87
 
    cout << _("On Line: ") << *::boost::get_error_info<boost::throw_line>(ex) << endl;
 
49
  boost::this_thread::at_thread_exit(&internal::my_thread_end);
88
50
 
89
 
    TransactionServices::singleton().sendShutdownEvent(*session.get());
90
 
  }
91
 
  // @todo remove hard spin by disconnection the session first from the
92
 
  // thread.
93
 
  while (not session.unique()) {}
 
51
  session->thread_stack= (char*) &session;
 
52
  session->run();
 
53
  killSessionNow(session);
94
54
}
95
55
 
96
56
void MultiThreadScheduler::setStackSize()
106
66
 
107
67
  if (err != 0)
108
68
  {
109
 
    errmsg_printf(error::ERROR, _("Unable to get thread stack size"));
 
69
    errmsg_printf(ERRMSG_LVL_ERROR, _("Unable to get thread stack size\n"));
110
70
    my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
111
71
  }
112
72
 
131
91
#endif
132
92
}
133
93
 
134
 
bool MultiThreadScheduler::addSession(Session::shared_ptr &session)
 
94
bool MultiThreadScheduler::addSession(Session *session)
135
95
{
136
96
  if (thread_count >= max_threads)
137
97
    return true;
138
98
 
139
99
  thread_count.increment();
140
 
  try
141
 
  {
142
 
    session->getThread().reset(new boost::thread((boost::bind(&MultiThreadScheduler::runSession, this, session->getSessionId()))));
143
 
  }
144
 
  catch (std::exception&)
145
 
  {
146
 
    thread_count.decrement();
147
 
    return true;
148
 
  }
149
 
 
150
 
  if (not session->getThread())
151
 
  {
152
 
    thread_count.decrement();
153
 
    return true;
154
 
  }
155
 
 
156
 
  if (not session->getThread()->joinable())
 
100
 
 
101
  boost::thread new_thread(boost::bind(&MultiThreadScheduler::runSession, this, session));
 
102
 
 
103
  if (not new_thread.joinable())
157
104
  {
158
105
    thread_count.decrement();
159
106
    return true;
163
110
}
164
111
 
165
112
 
166
 
void MultiThreadScheduler::killSession(Session *session)
167
 
{
168
 
  boost_thread_shared_ptr thread(session->getThread());
169
 
 
170
 
  if (thread)
171
 
  {
172
 
    thread->interrupt();
173
 
  }
174
 
}
175
 
 
176
 
void MultiThreadScheduler::killSessionNow(Session::shared_ptr &session)
177
 
{
178
 
  killSession(session.get());
179
 
 
180
 
  session->disconnect();
181
 
 
 
113
void MultiThreadScheduler::killSessionNow(Session *session)
 
114
{
182
115
  /* Locks LOCK_thread_count and deletes session */
183
116
  Session::unlink(session);
184
117
  thread_count.decrement();
186
119
 
187
120
MultiThreadScheduler::~MultiThreadScheduler()
188
121
{
189
 
  boost::mutex::scoped_lock scopedLock(drizzled::session::Cache::singleton().mutex());
 
122
  boost::mutex::scoped_lock scopedLock(LOCK_thread_count);
190
123
  while (thread_count)
191
124
  {
192
125
    COND_thread_count.wait(scopedLock);
193
126
  }
194
127
}
195
128
 
196
 
} // multi_thread namespace
197
 
 
198
129
  
199
130
static int init(drizzled::module::Context &context)
200
131
{
201
132
  
202
 
  context.add(new multi_thread::MultiThreadScheduler("multi_thread"));
 
133
  const module::option_map &vm= context.getOptions();
 
134
  if (vm.count("max-threads"))
 
135
  {
 
136
    if (max_threads > 4096 || max_threads < 1)
 
137
    {
 
138
      errmsg_printf(ERRMSG_LVL_ERROR, _("Invalid value for max-threads\n"));
 
139
      exit(-1);
 
140
    }
 
141
  }
 
142
 
 
143
  scheduler= new MultiThreadScheduler("multi_thread");
 
144
  context.add(scheduler);
203
145
 
204
146
  return 0;
205
147
}
206
148
 
 
149
static DRIZZLE_SYSVAR_UINT(max_threads, max_threads,
 
150
                           PLUGIN_VAR_RQCMDARG,
 
151
                           N_("Maximum number of user threads available."),
 
152
                           NULL, NULL, 2048, 1, 4096, 0);
 
153
 
207
154
static void init_options(drizzled::module::option_context &context)
208
155
{
209
156
  context("max-threads",
210
 
          po::value<max_threads_constraint>(&max_threads)->default_value(2048),
211
 
          _("Maximum number of user threads available."));
 
157
          po::value<uint32_t>(&max_threads)->default_value(2048),
 
158
          N_("Maximum number of user threads available."));
212
159
}
213
160
 
 
161
static drizzle_sys_var* sys_variables[]= {
 
162
  DRIZZLE_SYSVAR(max_threads),
 
163
  NULL
 
164
};
 
165
 
214
166
DRIZZLE_DECLARE_PLUGIN
215
167
{
216
168
  DRIZZLE_VERSION_ID,
220
172
  "One Thread Per Session Scheduler",
221
173
  PLUGIN_LICENSE_GPL,
222
174
  init, /* Plugin Init */
223
 
  NULL,   /* depends */
 
175
  sys_variables,   /* system variables */
224
176
  init_options    /* config options */
225
177
}
226
178
DRIZZLE_DECLARE_PLUGIN_END;