~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/multi_thread/multi_thread.cc

  • Committer: Brian Aker
  • Date: 2009-08-18 07:20:29 UTC
  • mfrom: (1117.1.9 merge)
  • Revision ID: brian@gaz-20090818072029-s9ch5lcmltxwidn7
Merge of Brian

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
   along with this program; if not, write to the Free Software
14
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
 
#include "config.h"
17
 
 
18
 
#include <iostream>
19
 
 
20
 
#include <drizzled/pthread_globals.h>
21
 
#include <drizzled/module/option_map.h>
22
 
#include <drizzled/errmsg_print.h>
 
16
#include <drizzled/server_includes.h>
 
17
#include <drizzled/atomics.h>
 
18
#include <drizzled/gettext.h>
 
19
#include <drizzled/error.h>
 
20
#include <drizzled/plugin/scheduler.h>
 
21
#include <drizzled/sql_parse.h>
23
22
#include <drizzled/session.h>
24
 
#include <drizzled/session/cache.h>
25
 
#include <drizzled/abort_exception.h>
26
 
#include <drizzled/transaction_services.h>
27
 
#include <drizzled/gettext.h>
28
 
 
29
 
#include <boost/thread.hpp>
30
 
#include <boost/bind.hpp>
31
 
#include <boost/program_options.hpp>
32
 
 
33
 
#include "multi_thread.h"
34
 
 
35
 
namespace po= boost::program_options;
 
23
#include <string>
 
24
 
36
25
using namespace std;
37
26
using namespace drizzled;
38
27
 
39
28
/* Configuration variables. */
40
 
typedef constrained_check<uint32_t, 4096, 1> max_threads_constraint;
41
 
static max_threads_constraint max_threads;
 
29
static uint32_t max_threads;
42
30
 
43
 
namespace drizzled
 
31
/**
 
32
 * Function to be run as a thread for each session.
 
33
 */
 
34
namespace
44
35
{
45
 
  extern size_t my_thread_stack_size;
 
36
  extern "C" pthread_handler_t session_thread(void *arg);
46
37
}
47
38
 
48
 
namespace multi_thread {
49
 
 
50
 
void MultiThreadScheduler::runSession(drizzled::session_id_t id)
 
39
class MultiThreadScheduler: public plugin::Scheduler
51
40
{
52
 
  char stack_dummy;
53
 
  boost::this_thread::disable_interruption disable_by_default;
54
 
 
55
 
  Session::shared_ptr session(session::Cache::singleton().find(id));
56
 
 
57
 
  try
58
 
  {
59
 
 
60
 
    if (not session)
61
 
    {
62
 
      std::cerr << _("Session killed before thread could execute") << endl;
63
 
      return;
64
 
    }
65
 
    session->pushInterrupt(&disable_by_default);
66
 
 
67
 
    if (drizzled::internal::my_thread_init())
68
 
    {
69
 
      session->disconnect(drizzled::ER_OUT_OF_RESOURCES);
70
 
      session->status_var.aborted_connects++;
71
 
    }
72
 
    else
73
 
    {
74
 
      boost::this_thread::at_thread_exit(&internal::my_thread_end);
75
 
 
76
 
      session->thread_stack= (char*) &stack_dummy;
77
 
      session->run();
78
 
    }
79
 
 
 
41
private:
 
42
  drizzled::atomic<uint32_t> thread_count;
 
43
  pthread_attr_t attr;
 
44
 
 
45
public:
 
46
  MultiThreadScheduler(): Scheduler()
 
47
  {
 
48
    struct sched_param tmp_sched_param;
 
49
 
 
50
    memset(&tmp_sched_param, 0, sizeof(struct sched_param));
 
51
 
 
52
    /* Setup attribute parameter for session threads. */
 
53
    (void) pthread_attr_init(&attr);
 
54
    (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 
55
    pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
 
56
 
 
57
    tmp_sched_param.sched_priority= WAIT_PRIOR;
 
58
    (void) pthread_attr_setschedparam(&attr, &tmp_sched_param);
 
59
 
 
60
    thread_count= 0;
 
61
  }
 
62
 
 
63
  ~MultiThreadScheduler()
 
64
  {
 
65
    (void) pthread_mutex_lock(&LOCK_thread_count);
 
66
    while (thread_count)
 
67
    {
 
68
      pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
 
69
    }
 
70
 
 
71
    (void) pthread_mutex_unlock(&LOCK_thread_count);
 
72
    (void) pthread_attr_destroy(&attr);
 
73
  }
 
74
 
 
75
  virtual bool addSession(Session *session)
 
76
  {
 
77
    if (thread_count >= max_threads)
 
78
      return true;
 
79
 
 
80
    thread_count++;
 
81
  
 
82
    if (pthread_create(&session->real_id, &attr, session_thread,
 
83
                       static_cast<void*>(session)))
 
84
    {
 
85
      thread_count--;
 
86
      return true;
 
87
    }
 
88
  
 
89
    return false;
 
90
  }
 
91
  
 
92
  void runSession(Session *session)
 
93
  {
 
94
    if (my_thread_init())
 
95
    {
 
96
      session->disconnect(ER_OUT_OF_RESOURCES, true);
 
97
      statistic_increment(aborted_connects, &LOCK_status);
 
98
      killSessionNow(session);
 
99
    }
 
100
 
 
101
    session->thread_stack= (char*) &session;
 
102
    session->run();
80
103
    killSessionNow(session);
81
104
  }
82
 
  catch (abort_exception& ex)
83
 
  {
84
 
    cout << _("Drizzle has receieved an abort event.") << endl;
85
 
    cout << _("In Function: ") << *::boost::get_error_info<boost::throw_function>(ex) << endl;
86
 
    cout << _("In File: ") << *::boost::get_error_info<boost::throw_file>(ex) << endl;
87
 
    cout << _("On Line: ") << *::boost::get_error_info<boost::throw_line>(ex) << endl;
88
 
 
89
 
    TransactionServices::singleton().sendShutdownEvent(*session.get());
90
 
  }
91
 
  // @todo remove hard spin by disconnection the session first from the
92
 
  // thread.
93
 
  while (not session.unique()) {}
94
 
}
95
 
 
96
 
void MultiThreadScheduler::setStackSize()
97
 
{
98
 
  pthread_attr_t attr;
99
 
 
100
 
  (void) pthread_attr_init(&attr);
101
 
 
102
 
  /* Get the thread stack size that the OS will use and make sure
103
 
    that we update our global variable. */
104
 
  int err= pthread_attr_getstacksize(&attr, &my_thread_stack_size);
105
 
  pthread_attr_destroy(&attr);
106
 
 
107
 
  if (err != 0)
108
 
  {
109
 
    errmsg_printf(error::ERROR, _("Unable to get thread stack size"));
110
 
    my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
111
 
  }
112
 
 
113
 
  if (my_thread_stack_size == 0)
114
 
  {
115
 
    my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
116
 
  }
117
 
#ifdef __sun
118
 
  /*
119
 
   * Solaris will return zero for the stack size in a call to
120
 
   * pthread_attr_getstacksize() to indicate that the OS default stack
121
 
   * size is used. We need an actual value in my_thread_stack_size so that
122
 
   * check_stack_overrun() will work. The Solaris man page for the
123
 
   * pthread_attr_getstacksize() function says that 2M is used for 64-bit
124
 
   * processes. We'll explicitly set it here to make sure that is what
125
 
   * will be used.
126
 
   */
127
 
  if (my_thread_stack_size == 0)
128
 
  {
129
 
    my_thread_stack_size= 2 * 1024 * 1024;
130
 
  }
131
 
#endif
132
 
}
133
 
 
134
 
bool MultiThreadScheduler::addSession(Session::shared_ptr &session)
135
 
{
136
 
  if (thread_count >= max_threads)
137
 
    return true;
138
 
 
139
 
  thread_count.increment();
140
 
  try
141
 
  {
142
 
    session->getThread().reset(new boost::thread((boost::bind(&MultiThreadScheduler::runSession, this, session->getSessionId()))));
143
 
  }
144
 
  catch (std::exception&)
145
 
  {
146
 
    thread_count.decrement();
147
 
    return true;
148
 
  }
149
 
 
150
 
  if (not session->getThread())
151
 
  {
152
 
    thread_count.decrement();
153
 
    return true;
154
 
  }
155
 
 
156
 
  if (not session->getThread()->joinable())
157
 
  {
158
 
    thread_count.decrement();
159
 
    return true;
160
 
  }
161
 
 
162
 
  return false;
163
 
}
164
 
 
165
 
 
166
 
void MultiThreadScheduler::killSession(Session *session)
167
 
{
168
 
  boost_thread_shared_ptr thread(session->getThread());
169
 
 
170
 
  if (thread)
171
 
  {
172
 
    thread->interrupt();
173
 
  }
174
 
}
175
 
 
176
 
void MultiThreadScheduler::killSessionNow(Session::shared_ptr &session)
177
 
{
178
 
  killSession(session.get());
179
 
 
180
 
  session->disconnect();
181
 
 
182
 
  /* Locks LOCK_thread_count and deletes session */
183
 
  Session::unlink(session);
184
 
  thread_count.decrement();
185
 
}
186
 
 
187
 
MultiThreadScheduler::~MultiThreadScheduler()
188
 
{
189
 
  boost::mutex::scoped_lock scopedLock(drizzled::session::Cache::singleton().mutex());
190
 
  while (thread_count)
191
 
  {
192
 
    COND_thread_count.wait(scopedLock);
193
 
  }
194
 
}
195
 
 
196
 
} // multi_thread namespace
197
 
 
198
 
  
199
 
static int init(drizzled::module::Context &context)
200
 
{
201
 
  
202
 
  context.add(new multi_thread::MultiThreadScheduler("multi_thread"));
203
 
 
204
 
  return 0;
205
 
}
206
 
 
207
 
static void init_options(drizzled::module::option_context &context)
208
 
{
209
 
  context("max-threads",
210
 
          po::value<max_threads_constraint>(&max_threads)->default_value(2048),
211
 
          _("Maximum number of user threads available."));
212
 
}
213
 
 
214
 
DRIZZLE_DECLARE_PLUGIN
215
 
{
216
 
  DRIZZLE_VERSION_ID,
 
105
 
 
106
  void killSessionNow(Session *session)
 
107
  {
 
108
    /* Locks LOCK_thread_count and deletes session */
 
109
    unlink_session(session);
 
110
    thread_count--;
 
111
    my_thread_end();
 
112
    pthread_exit(0);
 
113
    /* We should never reach this point. */
 
114
  }
 
115
};
 
116
 
 
117
namespace
 
118
{
 
119
  extern "C" pthread_handler_t session_thread(void *arg)
 
120
  {
 
121
    Session *session= static_cast<Session*>(arg);
 
122
    MultiThreadScheduler *scheduler= static_cast<MultiThreadScheduler*>(session->scheduler);
 
123
    scheduler->runSession(session);
 
124
    return NULL;
 
125
  }
 
126
}
 
127
 
 
128
class MultiThreadFactory : public plugin::SchedulerFactory
 
129
{
 
130
public:
 
131
  MultiThreadFactory() : SchedulerFactory("multi_thread")
 
132
  {
 
133
    addAlias("multi-thread");
 
134
  }
 
135
 
 
136
  ~MultiThreadFactory()
 
137
  {
 
138
    if (scheduler != NULL)
 
139
      delete scheduler;
 
140
  }
 
141
 
 
142
  plugin::Scheduler *operator() ()
 
143
  {
 
144
    if (scheduler == NULL)
 
145
      scheduler= new MultiThreadScheduler();
 
146
    return scheduler;
 
147
  }
 
148
};
 
149
 
 
150
static MultiThreadFactory *factory= NULL;
 
151
 
 
152
static int init(drizzled::plugin::Registry &registry)
 
153
{
 
154
  factory= new MultiThreadFactory();
 
155
  registry.add(factory);
 
156
  return 0;
 
157
}
 
158
 
 
159
static int deinit(drizzled::plugin::Registry &registry)
 
160
{
 
161
  if (factory)
 
162
  {
 
163
    registry.remove(factory);
 
164
    delete factory;
 
165
  }
 
166
  return 0;
 
167
}
 
168
 
 
169
static DRIZZLE_SYSVAR_UINT(max_threads, max_threads,
 
170
                           PLUGIN_VAR_RQCMDARG,
 
171
                           N_("Maximum number of user threads available."),
 
172
                           NULL, NULL, 2048, 1, 4096, 0);
 
173
 
 
174
static struct st_mysql_sys_var* system_variables[]= {
 
175
  DRIZZLE_SYSVAR(max_threads),
 
176
  NULL
 
177
};
 
178
 
 
179
drizzle_declare_plugin(multi_thread)
 
180
{
217
181
  "multi_thread",
218
182
  "0.1",
219
183
  "Brian Aker",
220
184
  "One Thread Per Session Scheduler",
221
185
  PLUGIN_LICENSE_GPL,
222
186
  init, /* Plugin Init */
223
 
  NULL,   /* depends */
224
 
  init_options    /* config options */
 
187
  deinit, /* Plugin Deinit */
 
188
  NULL,   /* status variables */
 
189
  system_variables,   /* system variables */
 
190
  NULL    /* config options */
225
191
}
226
 
DRIZZLE_DECLARE_PLUGIN_END;
 
192
drizzle_declare_plugin_end;