~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/multi_thread/multi_thread.cc

  • Committer: Olaf van der Spek
  • Date: 2011-06-23 11:44:30 UTC
  • mto: This revision was merged to the branch mainline in revision 2348.
  • Revision ID: olafvdspek@gmail.com-20110623114430-no355yypk4y3icqb
Refactor

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
   along with this program; if not, write to the Free Software
14
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
 
#include "config.h"
17
 
#include <plugin/multi_thread/multi_thread.h>
18
 
#include "drizzled/pthread_globals.h"
19
 
#include <boost/program_options.hpp>
 
16
#include <config.h>
 
17
 
 
18
#include <iostream>
 
19
 
 
20
#include <drizzled/pthread_globals.h>
20
21
#include <drizzled/module/option_map.h>
21
22
#include <drizzled/errmsg_print.h>
22
 
#include "drizzled/session.h"
23
 
#include "drizzled/session/cache.h"
 
23
#include <drizzled/session.h>
 
24
#include <drizzled/session/cache.h>
 
25
#include <drizzled/abort_exception.h>
 
26
#include <drizzled/transaction_services.h>
 
27
#include <drizzled/gettext.h>
 
28
#include <drizzled/plugin.h>
 
29
#include <drizzled/statistics_variables.h>
24
30
 
25
31
#include <boost/thread.hpp>
26
32
#include <boost/bind.hpp>
 
33
#include <boost/program_options.hpp>
 
34
 
 
35
#include "multi_thread.h"
27
36
 
28
37
namespace po= boost::program_options;
29
38
using namespace std;
44
53
{
45
54
  char stack_dummy;
46
55
  boost::this_thread::disable_interruption disable_by_default;
47
 
  Session::shared_ptr session(session::Cache::singleton().find(id));
48
 
 
49
 
  if (not session)
50
 
  {
51
 
    std::cerr << "Session killed before thread could execute\n";
52
 
    return;
53
 
  }
54
 
  session->pushInterrupt(&disable_by_default);
55
 
 
56
 
  if (drizzled::internal::my_thread_init())
57
 
  {
58
 
    session->disconnect(drizzled::ER_OUT_OF_RESOURCES, true);
59
 
    session->status_var.aborted_connects++;
 
56
 
 
57
  Session::shared_ptr session(session::Cache::find(id));
 
58
 
 
59
  try
 
60
  {
 
61
 
 
62
    if (not session)
 
63
    {
 
64
      std::cerr << _("Session killed before thread could execute") << endl;
 
65
      return;
 
66
    }
 
67
    session->pushInterrupt(&disable_by_default);
 
68
    drizzled::internal::my_thread_init();
 
69
    session->thread_stack= (char*) &stack_dummy;
 
70
    session->run();
 
71
 
60
72
    killSessionNow(session);
61
73
  }
62
 
  boost::this_thread::at_thread_exit(&internal::my_thread_end);
 
74
  catch (abort_exception& ex)
 
75
  {
 
76
    cout << _("Drizzle has receieved an abort event.") << endl;
 
77
    cout << _("In Function: ") << *::boost::get_error_info<boost::throw_function>(ex) << endl;
 
78
    cout << _("In File: ") << *::boost::get_error_info<boost::throw_file>(ex) << endl;
 
79
    cout << _("On Line: ") << *::boost::get_error_info<boost::throw_line>(ex) << endl;
63
80
 
64
 
  session->thread_stack= (char*) &stack_dummy;
65
 
  session->run();
66
 
  killSessionNow(session);
 
81
    TransactionServices::sendShutdownEvent(*session.get());
 
82
  }
67
83
  // @todo remove hard spin by disconnection the session first from the
68
84
  // thread.
69
85
  while (not session.unique()) {}
82
98
 
83
99
  if (err != 0)
84
100
  {
85
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Unable to get thread stack size\n"));
 
101
    errmsg_printf(error::ERROR, _("Unable to get thread stack size"));
86
102
    my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
87
103
  }
88
104
 
107
123
#endif
108
124
}
109
125
 
110
 
bool MultiThreadScheduler::addSession(Session::shared_ptr &session)
 
126
bool MultiThreadScheduler::addSession(const Session::shared_ptr& session)
111
127
{
112
128
  if (thread_count >= max_threads)
113
129
    return true;
114
130
 
115
131
  thread_count.increment();
116
 
 
117
 
  session->getThread().reset(new boost::thread((boost::bind(&MultiThreadScheduler::runSession, this, session->getSessionId()))));
 
132
  try
 
133
  {
 
134
    session->getThread().reset(new boost::thread((boost::bind(&MultiThreadScheduler::runSession, this, session->getSessionId()))));
 
135
  }
 
136
  catch (std::exception&)
 
137
  {
 
138
    thread_count.decrement();
 
139
    return true;
 
140
  }
118
141
 
119
142
  if (not session->getThread())
120
143
  {
134
157
 
135
158
void MultiThreadScheduler::killSession(Session *session)
136
159
{
137
 
  boost_thread_shared_ptr thread(session->getThread());
 
160
  thread_ptr thread(session->getThread());
138
161
 
139
162
  if (thread)
140
163
  {
142
165
  }
143
166
}
144
167
 
145
 
void MultiThreadScheduler::killSessionNow(Session::shared_ptr &session)
 
168
void MultiThreadScheduler::killSessionNow(const Session::shared_ptr& session)
146
169
{
147
170
  killSession(session.get());
 
171
 
 
172
  session->disconnect();
 
173
 
148
174
  /* Locks LOCK_thread_count and deletes session */
149
175
  Session::unlink(session);
150
176
  thread_count.decrement();
152
178
 
153
179
MultiThreadScheduler::~MultiThreadScheduler()
154
180
{
155
 
  boost::mutex::scoped_lock scopedLock(drizzled::session::Cache::singleton().mutex());
 
181
  boost::mutex::scoped_lock scopedLock(drizzled::session::Cache::mutex());
156
182
  while (thread_count)
157
183
  {
158
184
    COND_thread_count.wait(scopedLock);
174
200
{
175
201
  context("max-threads",
176
202
          po::value<max_threads_constraint>(&max_threads)->default_value(2048),
177
 
          N_("Maximum number of user threads available."));
 
203
          _("Maximum number of user threads available."));
178
204
}
179
205
 
180
206
DRIZZLE_DECLARE_PLUGIN
186
212
  "One Thread Per Session Scheduler",
187
213
  PLUGIN_LICENSE_GPL,
188
214
  init, /* Plugin Init */
189
 
  NULL,   /* system variables */
 
215
  NULL,   /* depends */
190
216
  init_options    /* config options */
191
217
}
192
218
DRIZZLE_DECLARE_PLUGIN_END;