~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/thr_lock.cc

  • Committer: Brian Aker
  • Date: 2010-09-22 22:25:29 UTC
  • mto: (1791.1.1 drizzle)
  • mto: This revision was merged to the branch mainline in revision 1792.
  • Revision ID: brian@tangent.org-20100922222529-geo4wggmu5ntqa5k
Current boost work (more conversion).

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
Read and write locks for Posix threads. All tread must acquire
56
56
#include "drizzled/internal/my_sys.h"
57
57
#include "drizzled/internal/thread_var.h"
58
58
#include "drizzled/statistics_variables.h"
59
 
#include "drizzled/pthread_globals.h"
60
59
 
61
60
#include "drizzled/session.h"
 
61
#include "drizzled/current_session.h"
62
62
 
63
63
#include "thr_lock.h"
64
64
#include "drizzled/internal/m_string.h"
78
78
 
79
79
#include <drizzled/util/test.h>
80
80
 
81
 
#include <boost/interprocess/sync/lock_options.hpp>
82
 
 
83
81
using namespace std;
84
82
 
85
83
namespace drizzled
89
87
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
90
88
 
91
89
 
92
 
uint64_t max_write_lock_count= UINT64_MAX;
93
 
 
94
 
void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count);
 
90
uint64_t max_write_lock_count= ~(uint64_t) 0L;
95
91
 
96
92
/*
97
93
** For the future (now the thread specific cond is alloced by my_pthread.c)
119
115
void THR_LOCK_INFO::init()
120
116
{
121
117
  internal::st_my_thread_var *tmp= my_thread_var;
 
118
  thread= tmp->pthread_self;
122
119
  thread_id= tmp->id;
123
120
  n_cursors= 0;
124
121
}
149
146
static void wake_up_waiters(THR_LOCK *lock);
150
147
 
151
148
 
152
 
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
 
149
static enum enum_thr_lock_result wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, bool in_wait_list)
153
150
{
154
 
  internal::st_my_thread_var *thread_var= session.getThreadVar();
 
151
  Session *session= current_session;
 
152
  internal::st_my_thread_var *thread_var= session->getThreadVar();
155
153
 
156
 
  boost::condition_variable_any *cond= &thread_var->suspend;
 
154
  boost::condition_variable *cond= &thread_var->suspend;
 
155
  struct timespec wait_timeout;
157
156
  enum enum_thr_lock_result result= THR_LOCK_ABORTED;
158
157
  bool can_deadlock= test(data->owner->info->n_cursors);
159
158
 
 
159
  if (!in_wait_list)
160
160
  {
161
161
    (*wait->last)=data;                         /* Wait for lock */
162
162
    data->prev= wait->last;
170
170
  thread_var->current_cond=  &thread_var->suspend;
171
171
  data->cond= &thread_var->suspend;;
172
172
 
173
 
  while (not thread_var->abort)
 
173
  if (can_deadlock)
 
174
    set_timespec(wait_timeout, table_lock_wait_timeout);
 
175
  while (!thread_var->abort || in_wait_list)
174
176
  {
175
 
    boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
176
 
 
177
 
    if (can_deadlock)
178
 
    {
179
 
      boost::xtime xt; 
180
 
      xtime_get(&xt, boost::TIME_UTC); 
181
 
      xt.sec += table_lock_wait_timeout; 
182
 
      if (not cond->timed_wait(scoped, xt))
183
 
      {
184
 
        result= THR_LOCK_WAIT_TIMEOUT;
185
 
        scoped.release();
186
 
        break;
187
 
      }
188
 
    }
189
 
    else
190
 
    {
191
 
      cond->wait(scoped);
192
 
    }
 
177
    int rc= (can_deadlock ?
 
178
             pthread_cond_timedwait(cond->native_handle(), data->lock->native_handle()->native_handle(), &wait_timeout) :
 
179
             pthread_cond_wait(cond->native_handle(), data->lock->native_handle()->native_handle()));
193
180
    /*
194
181
      We must break the wait if one of the following occurs:
195
182
      - the connection has been aborted (!thread_var->abort), but
205
192
    */
206
193
    if (data->cond == NULL)
207
194
    {
208
 
      scoped.release();
209
 
      break;
210
 
    }
211
 
    scoped.release();
 
195
      break;
 
196
    }
 
197
    if (rc == ETIMEDOUT || rc == ETIME)
 
198
    {
 
199
      result= THR_LOCK_WAIT_TIMEOUT;
 
200
      break;
 
201
    }
212
202
  }
213
203
  if (data->cond || data->type == TL_UNLOCK)
214
204
  {
229
219
  data->lock->unlock();
230
220
 
231
221
  /* The following must be done after unlock of lock->mutex */
232
 
  boost_unique_lock_t scopedLock(thread_var->mutex);
 
222
  boost::mutex::scoped_lock scopedLock(thread_var->mutex);
233
223
  thread_var->current_mutex= NULL;
234
224
  thread_var->current_cond= NULL;
235
225
  return(result);
236
226
}
237
227
 
238
228
 
239
 
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
 
229
static enum enum_thr_lock_result thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
240
230
{
241
231
  THR_LOCK *lock= data->lock;
242
232
  enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
384
374
  }
385
375
 
386
376
  /* Can't get lock yet;  Wait for it */
387
 
  return(wait_for_lock(session, wait_queue, data));
 
377
  return(wait_for_lock(wait_queue, data, 0));
388
378
end:
389
379
  lock->unlock();
390
380
 
406
396
 
407
397
  do
408
398
  {
409
 
    boost::condition_variable_any *cond= data->cond;
 
399
    boost::condition_variable *cond= data->cond;
410
400
    if ((int) data->type == (int) TL_READ_NO_INSERT)
411
401
    {
412
402
      if (using_concurrent_insert)
504
494
          lock->write.last= &data->next;
505
495
 
506
496
          {
507
 
            boost::condition_variable_any *cond= data->cond;
 
497
            boost::condition_variable *cond= data->cond;
508
498
            data->cond= NULL;                           /* Mark thread free */
509
499
            cond->notify_one(); /* Start waiting thred */
510
500
          }
531
521
              !lock->read_no_write_count))
532
522
    {
533
523
      do {
534
 
        boost::condition_variable_any *cond= data->cond;
 
524
        boost::condition_variable *cond= data->cond;
535
525
        if (((*data->prev)=data->next))         /* remove from wait-list */
536
526
          data->next->prev= data->prev;
537
527
        else
591
581
 
592
582
 
593
583
enum enum_thr_lock_result
594
 
thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
 
584
thr_multi_lock(THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
595
585
{
596
586
  THR_LOCK_DATA **pos,**end;
597
587
  if (count > 1)
599
589
  /* lock everything */
600
590
  for (pos=data,end=data+count; pos < end ; pos++)
601
591
  {
602
 
    enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
 
592
    enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
603
593
    if (result != THR_LOCK_SUCCESS)
604
594
    {                                           /* Aborted */
605
595
      thr_multi_unlock(data,(uint32_t) (pos-data));
640
630
  return;
641
631
}
642
632
 
643
 
void DrizzleLock::unlock(uint32_t count)
644
 
{
645
 
  THR_LOCK_DATA **pos,**end;
646
 
 
647
 
  for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
648
 
  {
649
 
    if ((*pos)->type != TL_UNLOCK)
650
 
      thr_unlock(*pos);
651
 
  }
652
 
}
653
 
 
654
633
/*
655
634
  Abort all threads waiting for a lock. The lock will be upgraded to
656
635
  TL_WRITE_ONLY to abort any new accesses to the lock
658
637
 
659
638
void THR_LOCK::abort_locks()
660
639
{
661
 
  boost_unique_lock_t scopedLock(mutex);
 
640
  boost::mutex::scoped_lock scopedLock(mutex);
662
641
 
663
642
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
664
643
  {
691
670
{
692
671
  bool found= false;
693
672
 
694
 
  boost_unique_lock_t scopedLock(mutex);
 
673
  boost::mutex::scoped_lock scopedLock(mutex);
695
674
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
696
675
  {
697
676
    if (local_data->owner->info->thread_id == thread_id_arg)