~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/multi_thread/multi_thread.cc

  • Committer: Monty Taylor
  • Date: 2011-02-13 17:26:39 UTC
  • mfrom: (2157.2.2 give-in-to-pkg-config)
  • mto: This revision was merged to the branch mainline in revision 2166.
  • Revision ID: mordred@inaugust.com-20110213172639-nhy7i72sfhoq13ms
Merged in pkg-config fixes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
16
#include "config.h"
17
 
#include <plugin/multi_thread/multi_thread.h>
18
 
#include "drizzled/pthread_globals.h"
19
 
#include <boost/program_options.hpp>
 
17
 
 
18
#include <iostream>
 
19
 
 
20
#include <drizzled/pthread_globals.h>
20
21
#include <drizzled/module/option_map.h>
21
22
#include <drizzled/errmsg_print.h>
 
23
#include <drizzled/session.h>
 
24
#include <drizzled/session/cache.h>
 
25
#include <drizzled/abort_exception.h>
 
26
#include <drizzled/transaction_services.h>
 
27
#include <drizzled/gettext.h>
22
28
 
23
29
#include <boost/thread.hpp>
24
30
#include <boost/bind.hpp>
 
31
#include <boost/program_options.hpp>
 
32
 
 
33
#include "multi_thread.h"
25
34
 
26
35
namespace po= boost::program_options;
27
36
using namespace std;
28
37
using namespace drizzled;
29
38
 
30
39
/* Configuration variables. */
31
 
static uint32_t max_threads;
32
 
 
33
 
/* Global's (TBR) */
34
 
static MultiThreadScheduler *scheduler= NULL;
 
40
typedef constrained_check<uint32_t, 4096, 1> max_threads_constraint;
 
41
static max_threads_constraint max_threads;
35
42
 
36
43
namespace drizzled
37
44
{
38
45
  extern size_t my_thread_stack_size;
39
46
}
40
47
 
41
 
void MultiThreadScheduler::runSession(drizzled::Session *session)
 
48
namespace multi_thread {
 
49
 
 
50
void MultiThreadScheduler::runSession(drizzled::session_id_t id)
42
51
{
43
 
  if (drizzled::internal::my_thread_init())
 
52
  char stack_dummy;
 
53
  boost::this_thread::disable_interruption disable_by_default;
 
54
 
 
55
  Session::shared_ptr session(session::Cache::singleton().find(id));
 
56
 
 
57
  try
44
58
  {
45
 
    session->disconnect(drizzled::ER_OUT_OF_RESOURCES, true);
46
 
    session->status_var.aborted_connects++;
 
59
 
 
60
    if (not session)
 
61
    {
 
62
      std::cerr << _("Session killed before thread could execute") << endl;
 
63
      return;
 
64
    }
 
65
    session->pushInterrupt(&disable_by_default);
 
66
 
 
67
    if (drizzled::internal::my_thread_init())
 
68
    {
 
69
      session->disconnect(drizzled::ER_OUT_OF_RESOURCES);
 
70
      session->status_var.aborted_connects++;
 
71
    }
 
72
    else
 
73
    {
 
74
      boost::this_thread::at_thread_exit(&internal::my_thread_end);
 
75
 
 
76
      session->thread_stack= (char*) &stack_dummy;
 
77
      session->run();
 
78
    }
 
79
 
47
80
    killSessionNow(session);
48
81
  }
49
 
  boost::this_thread::at_thread_exit(&internal::my_thread_end);
 
82
  catch (abort_exception& ex)
 
83
  {
 
84
    cout << _("Drizzle has receieved an abort event.") << endl;
 
85
    cout << _("In Function: ") << *::boost::get_error_info<boost::throw_function>(ex) << endl;
 
86
    cout << _("In File: ") << *::boost::get_error_info<boost::throw_file>(ex) << endl;
 
87
    cout << _("On Line: ") << *::boost::get_error_info<boost::throw_line>(ex) << endl;
50
88
 
51
 
  session->thread_stack= (char*) &session;
52
 
  session->run();
53
 
  killSessionNow(session);
 
89
    TransactionServices::singleton().sendShutdownEvent(*session.get());
 
90
  }
 
91
  // @todo remove hard spin by disconnection the session first from the
 
92
  // thread.
 
93
  while (not session.unique()) {}
54
94
}
55
95
 
56
96
void MultiThreadScheduler::setStackSize()
66
106
 
67
107
  if (err != 0)
68
108
  {
69
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Unable to get thread stack size\n"));
 
109
    errmsg_printf(error::ERROR, _("Unable to get thread stack size"));
70
110
    my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
71
111
  }
72
112
 
91
131
#endif
92
132
}
93
133
 
94
 
bool MultiThreadScheduler::addSession(Session *session)
 
134
bool MultiThreadScheduler::addSession(Session::shared_ptr &session)
95
135
{
96
136
  if (thread_count >= max_threads)
97
137
    return true;
98
138
 
99
139
  thread_count.increment();
100
 
 
101
 
  boost::thread new_thread(boost::bind(&MultiThreadScheduler::runSession, this, session));
102
 
 
103
 
  if (not new_thread.joinable())
 
140
  try
 
141
  {
 
142
    session->getThread().reset(new boost::thread((boost::bind(&MultiThreadScheduler::runSession, this, session->getSessionId()))));
 
143
  }
 
144
  catch (std::exception&)
 
145
  {
 
146
    thread_count.decrement();
 
147
    return true;
 
148
  }
 
149
 
 
150
  if (not session->getThread())
 
151
  {
 
152
    thread_count.decrement();
 
153
    return true;
 
154
  }
 
155
 
 
156
  if (not session->getThread()->joinable())
104
157
  {
105
158
    thread_count.decrement();
106
159
    return true;
110
163
}
111
164
 
112
165
 
113
 
void MultiThreadScheduler::killSessionNow(Session *session)
114
 
{
 
166
void MultiThreadScheduler::killSession(Session *session)
 
167
{
 
168
  boost_thread_shared_ptr thread(session->getThread());
 
169
 
 
170
  if (thread)
 
171
  {
 
172
    thread->interrupt();
 
173
  }
 
174
}
 
175
 
 
176
void MultiThreadScheduler::killSessionNow(Session::shared_ptr &session)
 
177
{
 
178
  killSession(session.get());
 
179
 
 
180
  session->disconnect();
 
181
 
115
182
  /* Locks LOCK_thread_count and deletes session */
116
183
  Session::unlink(session);
117
184
  thread_count.decrement();
119
186
 
120
187
MultiThreadScheduler::~MultiThreadScheduler()
121
188
{
122
 
  boost::mutex::scoped_lock scopedLock(LOCK_thread_count);
 
189
  boost::mutex::scoped_lock scopedLock(drizzled::session::Cache::singleton().mutex());
123
190
  while (thread_count)
124
191
  {
125
192
    COND_thread_count.wait(scopedLock);
126
193
  }
127
194
}
128
195
 
 
196
} // multi_thread namespace
 
197
 
129
198
  
130
199
static int init(drizzled::module::Context &context)
131
200
{
132
201
  
133
 
  const module::option_map &vm= context.getOptions();
134
 
  if (vm.count("max-threads"))
135
 
  {
136
 
    if (max_threads > 4096 || max_threads < 1)
137
 
    {
138
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Invalid value for max-threads\n"));
139
 
      exit(-1);
140
 
    }
141
 
  }
142
 
 
143
 
  scheduler= new MultiThreadScheduler("multi_thread");
144
 
  context.add(scheduler);
 
202
  context.add(new multi_thread::MultiThreadScheduler("multi_thread"));
145
203
 
146
204
  return 0;
147
205
}
148
206
 
149
 
static DRIZZLE_SYSVAR_UINT(max_threads, max_threads,
150
 
                           PLUGIN_VAR_RQCMDARG,
151
 
                           N_("Maximum number of user threads available."),
152
 
                           NULL, NULL, 2048, 1, 4096, 0);
153
 
 
154
207
static void init_options(drizzled::module::option_context &context)
155
208
{
156
209
  context("max-threads",
157
 
          po::value<uint32_t>(&max_threads)->default_value(2048),
158
 
          N_("Maximum number of user threads available."));
 
210
          po::value<max_threads_constraint>(&max_threads)->default_value(2048),
 
211
          _("Maximum number of user threads available."));
159
212
}
160
213
 
161
 
static drizzle_sys_var* sys_variables[]= {
162
 
  DRIZZLE_SYSVAR(max_threads),
163
 
  NULL
164
 
};
165
 
 
166
214
DRIZZLE_DECLARE_PLUGIN
167
215
{
168
216
  DRIZZLE_VERSION_ID,
172
220
  "One Thread Per Session Scheduler",
173
221
  PLUGIN_LICENSE_GPL,
174
222
  init, /* Plugin Init */
175
 
  sys_variables,   /* system variables */
 
223
  NULL,   /* depends */
176
224
  init_options    /* config options */
177
225
}
178
226
DRIZZLE_DECLARE_PLUGIN_END;