~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/multi_thread/multi_thread.cc

fix pthread atomics. operator precedence is important. The unit test now passes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
#include "config.h"
17
17
#include <plugin/multi_thread/multi_thread.h>
18
18
#include "drizzled/pthread_globals.h"
19
 
#include <boost/program_options.hpp>
20
 
#include <drizzled/module/option_map.h>
21
 
#include <drizzled/errmsg_print.h>
22
 
#include "drizzled/session.h"
23
 
#include "drizzled/session_list.h"
24
 
 
25
 
#include <boost/thread.hpp>
26
 
#include <boost/bind.hpp>
27
 
 
28
 
namespace po= boost::program_options;
 
19
 
29
20
using namespace std;
30
21
using namespace drizzled;
31
22
 
32
23
/* Configuration variables. */
33
 
typedef constrained_check<uint32_t, 4096, 1> max_threads_constraint;
34
 
static max_threads_constraint max_threads;
35
 
 
36
 
namespace drizzled
37
 
{
38
 
  extern size_t my_thread_stack_size;
39
 
}
40
 
 
41
 
void MultiThreadScheduler::runSession(drizzled::session_id_t id)
42
 
{
43
 
  char stack_dummy;
44
 
  Session::shared_ptr session(session::Cache::singleton().find(id));
45
 
 
46
 
  if (not session)
47
 
  {
48
 
    std::cerr << "Session killed before thread could execute\n";
49
 
    return;
50
 
  }
51
 
 
52
 
  if (drizzled::internal::my_thread_init())
53
 
  {
54
 
    session->disconnect(drizzled::ER_OUT_OF_RESOURCES, true);
55
 
    session->status_var.aborted_connects++;
56
 
    killSessionNow(session);
57
 
  }
58
 
  boost::this_thread::at_thread_exit(&internal::my_thread_end);
59
 
 
60
 
  session->thread_stack= (char*) &stack_dummy;
61
 
  session->run();
62
 
  killSessionNow(session);
63
 
  // @todo remove hard spin by disconnection the session first from the
64
 
  // thread.
65
 
  while (not session.unique()) {}
66
 
}
67
 
 
68
 
void MultiThreadScheduler::setStackSize()
69
 
{
70
 
  pthread_attr_t attr;
71
 
 
72
 
  (void) pthread_attr_init(&attr);
73
 
 
74
 
  /* Get the thread stack size that the OS will use and make sure
75
 
    that we update our global variable. */
76
 
  int err= pthread_attr_getstacksize(&attr, &my_thread_stack_size);
77
 
  pthread_attr_destroy(&attr);
78
 
 
79
 
  if (err != 0)
80
 
  {
81
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Unable to get thread stack size\n"));
82
 
    my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
83
 
  }
84
 
 
85
 
  if (my_thread_stack_size == 0)
86
 
  {
87
 
    my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
88
 
  }
89
 
#ifdef __sun
90
 
  /*
91
 
   * Solaris will return zero for the stack size in a call to
92
 
   * pthread_attr_getstacksize() to indicate that the OS default stack
93
 
   * size is used. We need an actual value in my_thread_stack_size so that
94
 
   * check_stack_overrun() will work. The Solaris man page for the
95
 
   * pthread_attr_getstacksize() function says that 2M is used for 64-bit
96
 
   * processes. We'll explicitly set it here to make sure that is what
97
 
   * will be used.
98
 
   */
99
 
  if (my_thread_stack_size == 0)
100
 
  {
101
 
    my_thread_stack_size= 2 * 1024 * 1024;
102
 
  }
103
 
#endif
104
 
}
105
 
 
106
 
bool MultiThreadScheduler::addSession(Session::shared_ptr &session)
 
24
static uint32_t max_threads;
 
25
 
 
26
/* Global's (TBR) */
 
27
static MultiThreadScheduler *scheduler= NULL;
 
28
 
 
29
/**
 
30
 * Function to be run as a thread for each session.
 
31
 */
 
32
namespace
 
33
{
 
34
  extern "C" pthread_handler_t session_thread(void *arg);
 
35
}
 
36
 
 
37
namespace
 
38
{
 
39
  extern "C" pthread_handler_t session_thread(void *arg)
 
40
  {
 
41
    Session *session= static_cast<Session*>(arg);
 
42
    MultiThreadScheduler *sched= static_cast<MultiThreadScheduler*>(session->scheduler);
 
43
    sched->runSession(session);
 
44
    return NULL;
 
45
  }
 
46
}
 
47
 
 
48
 
 
49
bool MultiThreadScheduler::addSession(Session *session)
107
50
{
108
51
  if (thread_count >= max_threads)
109
52
    return true;
110
53
 
111
54
  thread_count.increment();
112
55
 
113
 
  boost::thread new_thread(boost::bind(&MultiThreadScheduler::runSession, this, session->getSessionId()));
114
 
 
115
 
  if (not new_thread.joinable())
 
56
  if (pthread_create(&session->real_id, &attr, session_thread,
 
57
                     static_cast<void*>(session)))
116
58
  {
117
59
    thread_count.decrement();
118
60
    return true;
122
64
}
123
65
 
124
66
 
125
 
void MultiThreadScheduler::killSessionNow(Session::shared_ptr &session)
 
67
void MultiThreadScheduler::killSessionNow(Session *session)
126
68
{
127
69
  /* Locks LOCK_thread_count and deletes session */
128
70
  Session::unlink(session);
129
71
  thread_count.decrement();
 
72
  internal::my_thread_end();
 
73
  pthread_exit(0);
 
74
  /* We should never reach this point. */
130
75
}
131
76
 
132
77
MultiThreadScheduler::~MultiThreadScheduler()
133
78
{
134
 
  boost::mutex::scoped_lock scopedLock(drizzled::session::Cache::singleton().mutex());
 
79
  (void) pthread_mutex_lock(&LOCK_thread_count);
135
80
  while (thread_count)
136
81
  {
137
 
    COND_thread_count.wait(scopedLock);
 
82
    pthread_cond_wait(&COND_thread_count, &LOCK_thread_count);
138
83
  }
 
84
 
 
85
  (void) pthread_mutex_unlock(&LOCK_thread_count);
 
86
  (void) pthread_attr_destroy(&attr);
139
87
}
140
88
 
141
89
  
142
90
static int init(drizzled::module::Context &context)
143
91
{
144
 
  
145
 
  context.add(new MultiThreadScheduler("multi_thread"));
 
92
  scheduler= new MultiThreadScheduler("multi_thread");
 
93
  context.add(scheduler);
146
94
 
147
95
  return 0;
148
96
}
149
97
 
150
 
static void init_options(drizzled::module::option_context &context)
151
 
{
152
 
  context("max-threads",
153
 
          po::value<max_threads_constraint>(&max_threads)->default_value(2048),
154
 
          N_("Maximum number of user threads available."));
155
 
}
 
98
static DRIZZLE_SYSVAR_UINT(max_threads, max_threads,
 
99
                           PLUGIN_VAR_RQCMDARG,
 
100
                           N_("Maximum number of user threads available."),
 
101
                           NULL, NULL, 2048, 1, 4096, 0);
 
102
 
 
103
static drizzle_sys_var* sys_variables[]= {
 
104
  DRIZZLE_SYSVAR(max_threads),
 
105
  NULL
 
106
};
156
107
 
157
108
DRIZZLE_DECLARE_PLUGIN
158
109
{
163
114
  "One Thread Per Session Scheduler",
164
115
  PLUGIN_LICENSE_GPL,
165
116
  init, /* Plugin Init */
166
 
  NULL,   /* system variables */
167
 
  init_options    /* config options */
 
117
  sys_variables,   /* system variables */
 
118
  NULL    /* config options */
168
119
}
169
120
DRIZZLE_DECLARE_PLUGIN_END;