~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/thr_lock.cc

  • Committer: Brian Aker
  • Date: 2010-08-12 17:19:46 UTC
  • mfrom: (1701.1.1 turn-off-csv)
  • Revision ID: brian@tangent.org-20100812171946-n44naaqhg27gehlh
MErge Monty, remove CSV from auto-build

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
Read and write locks for Posix threads. All tread must acquire
52
52
 
53
53
*/
54
54
 
55
 
#include <config.h>
56
 
#include <drizzled/internal/my_sys.h>
57
 
#include <drizzled/internal/thread_var.h>
58
 
#include <drizzled/statistics_variables.h>
59
 
#include <drizzled/pthread_globals.h>
60
 
 
61
 
#include <drizzled/session.h>
 
55
#include "config.h"
 
56
#include "drizzled/internal/my_sys.h"
 
57
#include "drizzled/internal/thread_var.h"
 
58
#include "drizzled/statistics_variables.h"
62
59
 
63
60
#include "thr_lock.h"
64
 
#include <drizzled/internal/m_string.h>
 
61
#include "drizzled/internal/m_string.h"
65
62
#include <errno.h>
66
63
#include <list>
67
64
 
78
75
 
79
76
#include <drizzled/util/test.h>
80
77
 
81
 
#include <boost/interprocess/sync/lock_options.hpp>
82
 
 
83
78
using namespace std;
84
79
 
85
80
namespace drizzled
89
84
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
90
85
 
91
86
 
92
 
uint64_t max_write_lock_count= UINT64_MAX;
 
87
uint64_t max_write_lock_count= ~(uint64_t) 0L;
93
88
 
94
89
/*
95
90
** For the future (now the thread specific cond is alloced by my_pthread.c)
117
112
void THR_LOCK_INFO::init()
118
113
{
119
114
  internal::st_my_thread_var *tmp= my_thread_var;
 
115
  thread= tmp->pthread_self;
120
116
  thread_id= tmp->id;
121
117
  n_cursors= 0;
122
118
}
147
143
static void wake_up_waiters(THR_LOCK *lock);
148
144
 
149
145
 
150
 
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
 
146
static enum enum_thr_lock_result wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, bool in_wait_list)
151
147
{
152
 
  internal::st_my_thread_var *thread_var= session.getThreadVar();
153
 
 
154
 
  boost::condition_variable_any *cond= &thread_var->suspend;
 
148
  internal::st_my_thread_var *thread_var= my_thread_var;
 
149
  pthread_cond_t *cond= &thread_var->suspend;
 
150
  struct timespec wait_timeout;
155
151
  enum enum_thr_lock_result result= THR_LOCK_ABORTED;
156
152
  bool can_deadlock= test(data->owner->info->n_cursors);
157
153
 
 
154
  if (!in_wait_list)
158
155
  {
159
156
    (*wait->last)=data;                         /* Wait for lock */
160
157
    data->prev= wait->last;
165
162
 
166
163
  /* Set up control struct to allow others to abort locks */
167
164
  thread_var->current_mutex= data->lock->native_handle();
168
 
  thread_var->current_cond=  &thread_var->suspend;
169
 
  data->cond= &thread_var->suspend;;
 
165
  thread_var->current_cond=  cond;
 
166
  data->cond= cond;
170
167
 
171
 
  while (not thread_var->abort)
 
168
  if (can_deadlock)
 
169
    set_timespec(wait_timeout, table_lock_wait_timeout);
 
170
  while (!thread_var->abort || in_wait_list)
172
171
  {
173
 
    boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
174
 
 
175
 
    if (can_deadlock)
176
 
    {
177
 
      boost::xtime xt; 
178
 
      xtime_get(&xt, boost::TIME_UTC); 
179
 
      xt.sec += table_lock_wait_timeout; 
180
 
      if (not cond->timed_wait(scoped, xt))
181
 
      {
182
 
        result= THR_LOCK_WAIT_TIMEOUT;
183
 
        scoped.release();
184
 
        break;
185
 
      }
186
 
    }
187
 
    else
188
 
    {
189
 
      cond->wait(scoped);
190
 
    }
 
172
    int rc= (can_deadlock ?
 
173
             pthread_cond_timedwait(cond, data->lock->native_handle(),
 
174
                                    &wait_timeout) :
 
175
             pthread_cond_wait(cond, data->lock->native_handle()));
191
176
    /*
192
177
      We must break the wait if one of the following occurs:
193
178
      - the connection has been aborted (!thread_var->abort), but
201
186
      Order of checks below is important to not report about timeout
202
187
      if the predicate is true.
203
188
    */
204
 
    if (data->cond == NULL)
205
 
    {
206
 
      scoped.release();
207
 
      break;
208
 
    }
209
 
    scoped.release();
 
189
    if (data->cond == 0)
 
190
    {
 
191
      break;
 
192
    }
 
193
    if (rc == ETIMEDOUT || rc == ETIME)
 
194
    {
 
195
      result= THR_LOCK_WAIT_TIMEOUT;
 
196
      break;
 
197
    }
210
198
  }
211
199
  if (data->cond || data->type == TL_UNLOCK)
212
200
  {
227
215
  data->lock->unlock();
228
216
 
229
217
  /* The following must be done after unlock of lock->mutex */
230
 
  boost_unique_lock_t scopedLock(thread_var->mutex);
 
218
  pthread_mutex_lock(&thread_var->mutex);
231
219
  thread_var->current_mutex= NULL;
232
220
  thread_var->current_cond= NULL;
 
221
  pthread_mutex_unlock(&thread_var->mutex);
233
222
  return(result);
234
223
}
235
224
 
236
225
 
237
 
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
 
226
static enum enum_thr_lock_result thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
238
227
{
239
228
  THR_LOCK *lock= data->lock;
240
229
  enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
382
371
  }
383
372
 
384
373
  /* Can't get lock yet;  Wait for it */
385
 
  return(wait_for_lock(session, wait_queue, data));
 
374
  return(wait_for_lock(wait_queue, data, 0));
386
375
end:
387
376
  lock->unlock();
388
377
 
404
393
 
405
394
  do
406
395
  {
407
 
    boost::condition_variable_any *cond= data->cond;
 
396
    pthread_cond_t *cond=data->cond;
408
397
    if ((int) data->type == (int) TL_READ_NO_INSERT)
409
398
    {
410
399
      if (using_concurrent_insert)
424
413
      }
425
414
      lock->read_no_write_count++;
426
415
    }
427
 
    data->cond= NULL;                           /* Mark thread free */
428
 
    cond->notify_one();
 
416
    data->cond=0;                               /* Mark thread free */
 
417
    pthread_cond_signal(cond);
429
418
  } while ((data=data->next));
430
419
  *lock->read_wait.last=0;
431
420
  if (!lock->read_wait.data)
502
491
          lock->write.last= &data->next;
503
492
 
504
493
          {
505
 
            boost::condition_variable_any *cond= data->cond;
506
 
            data->cond= NULL;                           /* Mark thread free */
507
 
            cond->notify_one(); /* Start waiting thred */
 
494
            pthread_cond_t *cond=data->cond;
 
495
            data->cond=0;                               /* Mark thread free */
 
496
            pthread_cond_signal(cond);  /* Start waiting thread */
508
497
          }
509
498
          if (data->type != TL_WRITE_ALLOW_WRITE ||
510
499
              !lock->write_wait.data ||
529
518
              !lock->read_no_write_count))
530
519
    {
531
520
      do {
532
 
        boost::condition_variable_any *cond= data->cond;
 
521
        pthread_cond_t *cond=data->cond;
533
522
        if (((*data->prev)=data->next))         /* remove from wait-list */
534
523
          data->next->prev= data->prev;
535
524
        else
538
527
        data->prev=lock->write.last;
539
528
        lock->write.last= &data->next;
540
529
        data->next=0;                           /* Only one write lock */
541
 
        data->cond= NULL;                               /* Mark thread free */
542
 
        cond->notify_one(); /* Start waiting thread */
 
530
        data->cond=0;                           /* Mark thread free */
 
531
        pthread_cond_signal(cond);      /* Start waiting thread */
543
532
      } while (lock_type == TL_WRITE_ALLOW_WRITE &&
544
533
               (data=lock->write_wait.data) &&
545
534
               data->type == TL_WRITE_ALLOW_WRITE);
589
578
 
590
579
 
591
580
enum enum_thr_lock_result
592
 
thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
 
581
thr_multi_lock(THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
593
582
{
594
583
  THR_LOCK_DATA **pos,**end;
595
584
  if (count > 1)
597
586
  /* lock everything */
598
587
  for (pos=data,end=data+count; pos < end ; pos++)
599
588
  {
600
 
    enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
 
589
    enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
601
590
    if (result != THR_LOCK_SUCCESS)
602
591
    {                                           /* Aborted */
603
592
      thr_multi_unlock(data,(uint32_t) (pos-data));
638
627
  return;
639
628
}
640
629
 
641
 
void DrizzleLock::unlock(uint32_t count)
642
 
{
643
 
  THR_LOCK_DATA **pos,**end;
644
 
 
645
 
  for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
646
 
  {
647
 
    if ((*pos)->type != TL_UNLOCK)
648
 
      thr_unlock(*pos);
649
 
  }
650
 
}
651
 
 
652
630
/*
653
631
  Abort all threads waiting for a lock. The lock will be upgraded to
654
632
  TL_WRITE_ONLY to abort any new accesses to the lock
656
634
 
657
635
void THR_LOCK::abort_locks()
658
636
{
659
 
  boost_unique_lock_t scopedLock(mutex);
 
637
  pthread_mutex_lock(&mutex);
660
638
 
661
639
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
662
640
  {
663
641
    local_data->type= TL_UNLOCK;                        /* Mark killed */
664
642
    /* It's safe to signal the cond first: we're still holding the mutex. */
665
 
    local_data->cond->notify_one();
 
643
    pthread_cond_signal(local_data->cond);
666
644
    local_data->cond= NULL;                             /* Removed from list */
667
645
  }
668
646
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
669
647
  {
670
648
    local_data->type= TL_UNLOCK;
671
 
    local_data->cond->notify_one();
 
649
    pthread_cond_signal(local_data->cond);
672
650
    local_data->cond= NULL;
673
651
  }
674
652
  read_wait.last= &read_wait.data;
676
654
  read_wait.data= write_wait.data=0;
677
655
  if (write.data)
678
656
    write.data->type=TL_WRITE_ONLY;
 
657
  pthread_mutex_unlock(&mutex);
679
658
}
680
659
 
681
660
 
689
668
{
690
669
  bool found= false;
691
670
 
692
 
  boost_unique_lock_t scopedLock(mutex);
 
671
  pthread_mutex_lock(&mutex);
693
672
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
694
673
  {
695
674
    if (local_data->owner->info->thread_id == thread_id_arg)
697
676
      local_data->type= TL_UNLOCK;                      /* Mark killed */
698
677
      /* It's safe to signal the cond first: we're still holding the mutex. */
699
678
      found= true;
700
 
      local_data->cond->notify_one();
 
679
      pthread_cond_signal(local_data->cond);
701
680
      local_data->cond= 0;                              /* Removed from list */
702
681
 
703
682
      if (((*local_data->prev)= local_data->next))
712
691
    {
713
692
      local_data->type= TL_UNLOCK;
714
693
      found= true;
715
 
      local_data->cond->notify_one();
 
694
      pthread_cond_signal(local_data->cond);
716
695
      local_data->cond= NULL;
717
696
 
718
697
      if (((*local_data->prev)= local_data->next))
722
701
    }
723
702
  }
724
703
  wake_up_waiters(this);
 
704
  pthread_mutex_unlock(&mutex);
725
705
 
726
706
  return found;
727
707
}