~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/thr_lock.cc

  • Committer: Olaf van der Spek
  • Date: 2011-02-12 18:24:24 UTC
  • mto: (2167.1.2 build) (2172.1.4 build)
  • mto: This revision was merged to the branch mainline in revision 2168.
  • Revision ID: olafvdspek@gmail.com-20110212182424-kgnm9osi7qo97at2
casts

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
16
/*
17
17
Read and write locks for Posix threads. All tread must acquire
46
46
should put a pointer to the following functions in the lock structure:
47
47
(If the pointer is zero (default), the function is not called)
48
48
 
49
 
check_status:
50
 
         Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
51
 
         we check if this function exists and returns 0.
52
 
         If not, then the lock is upgraded to TL_WRITE_LOCK
53
 
         In MyISAM this is a simple check if the insert can be done
54
 
         at the end of the datafile.
55
 
update_status:
56
 
        Before a write lock is released, this function is called.
57
 
        In MyISAM this functions updates the count and length of the datafile
58
 
get_status:
59
 
        When one gets a lock this functions is called.
60
 
        In MyISAM this stores the number of rows and size of the datafile
61
 
        for concurrent reads.
62
49
 
63
50
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
64
51
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
67
54
 
68
55
#include "config.h"
69
56
#include "drizzled/internal/my_sys.h"
 
57
#include "drizzled/internal/thread_var.h"
 
58
#include "drizzled/statistics_variables.h"
 
59
#include "drizzled/pthread_globals.h"
 
60
 
 
61
#include "drizzled/session.h"
70
62
 
71
63
#include "thr_lock.h"
72
64
#include "drizzled/internal/m_string.h"
86
78
 
87
79
#include <drizzled/util/test.h>
88
80
 
 
81
#include <boost/interprocess/sync/lock_options.hpp>
 
82
 
89
83
using namespace std;
90
84
 
91
85
namespace drizzled
92
86
{
93
87
 
94
 
bool thr_lock_inited= false;
95
 
uint32_t locks_immediate = 0L, locks_waited = 0L;
96
88
uint64_t table_lock_wait_timeout;
97
89
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
98
90
 
99
91
 
100
 
static list<THR_LOCK *> thr_lock_thread_list;          /* List of threads in use */
101
 
 
102
 
uint64_t max_write_lock_count= ~(uint64_t) 0L;
103
 
 
104
 
static inline pthread_cond_t *get_cond(void)
105
 
{
106
 
  return &my_thread_var->suspend;
107
 
}
 
92
uint64_t max_write_lock_count= UINT64_MAX;
 
93
 
 
94
void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count);
108
95
 
109
96
/*
110
97
** For the future (now the thread specific cond is alloced by my_pthread.c)
111
98
*/
112
99
 
113
 
bool init_thr_lock()
114
 
{
115
 
  thr_lock_inited= true;
116
 
 
117
 
  return false;
118
 
}
119
 
 
120
100
static inline bool
121
101
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
122
102
{
128
108
 
129
109
void thr_lock_init(THR_LOCK *lock)
130
110
{
131
 
  memset(lock, 0, sizeof(*lock));
132
 
  pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
 
111
  lock->init();
133
112
  lock->read.last= &lock->read.data;
134
113
  lock->read_wait.last= &lock->read_wait.data;
135
114
  lock->write_wait.last= &lock->write_wait.data;
136
115
  lock->write.last= &lock->write.data;
137
 
 
138
 
  pthread_mutex_lock(&internal::THR_LOCK_lock);         /* Add to locks in use */
139
 
  thr_lock_thread_list.push_front(lock);
140
 
  pthread_mutex_unlock(&internal::THR_LOCK_lock);
141
 
}
142
 
 
143
 
 
144
 
void thr_lock_delete(THR_LOCK *lock)
145
 
{
146
 
  pthread_mutex_destroy(&lock->mutex);
147
 
  pthread_mutex_lock(&internal::THR_LOCK_lock);
148
 
  thr_lock_thread_list.remove(lock);
149
 
  pthread_mutex_unlock(&internal::THR_LOCK_lock);
150
 
}
151
 
 
152
 
 
153
 
void thr_lock_info_init(THR_LOCK_INFO *info)
 
116
}
 
117
 
 
118
 
 
119
void THR_LOCK_INFO::init()
154
120
{
155
121
  internal::st_my_thread_var *tmp= my_thread_var;
156
 
  info->thread= tmp->pthread_self;
157
 
  info->thread_id= tmp->id;
158
 
  info->n_cursors= 0;
 
122
  thread_id= tmp->id;
 
123
  n_cursors= 0;
159
124
}
160
125
 
161
126
        /* Initialize a lock instance */
162
127
 
163
 
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
 
128
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
164
129
{
165
 
  data->lock= lock;
166
 
  data->type= TL_UNLOCK;
167
 
  data->owner= NULL;                               /* no owner yet */
168
 
  data->status_param= param;
169
 
  data->cond= NULL;
 
130
  lock= lock_arg;
 
131
  type= TL_UNLOCK;
 
132
  owner= NULL;                               /* no owner yet */
 
133
  status_param= param_arg;
 
134
  cond= NULL;
170
135
}
171
136
 
172
137
 
184
149
static void wake_up_waiters(THR_LOCK *lock);
185
150
 
186
151
 
187
 
static enum enum_thr_lock_result
188
 
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
189
 
              bool in_wait_list)
 
152
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
190
153
{
191
 
  internal::st_my_thread_var *thread_var= my_thread_var;
192
 
  pthread_cond_t *cond= &thread_var->suspend;
193
 
  struct timespec wait_timeout;
 
154
  internal::st_my_thread_var *thread_var= session.getThreadVar();
 
155
 
 
156
  boost::condition_variable_any *cond= &thread_var->suspend;
194
157
  enum enum_thr_lock_result result= THR_LOCK_ABORTED;
195
158
  bool can_deadlock= test(data->owner->info->n_cursors);
196
159
 
197
 
  if (!in_wait_list)
198
160
  {
199
161
    (*wait->last)=data;                         /* Wait for lock */
200
162
    data->prev= wait->last;
201
163
    wait->last= &data->next;
202
164
  }
203
165
 
204
 
  statistic_increment(locks_waited, &internal::THR_LOCK_lock);
 
166
  current_global_counters.locks_waited++;
205
167
 
206
168
  /* Set up control struct to allow others to abort locks */
207
 
  thread_var->current_mutex= &data->lock->mutex;
208
 
  thread_var->current_cond=  cond;
209
 
  data->cond= cond;
 
169
  thread_var->current_mutex= data->lock->native_handle();
 
170
  thread_var->current_cond=  &thread_var->suspend;
 
171
  data->cond= &thread_var->suspend;;
210
172
 
211
 
  if (can_deadlock)
212
 
    set_timespec(wait_timeout, table_lock_wait_timeout);
213
 
  while (!thread_var->abort || in_wait_list)
 
173
  while (not thread_var->abort)
214
174
  {
215
 
    int rc= (can_deadlock ?
216
 
             pthread_cond_timedwait(cond, &data->lock->mutex,
217
 
                                    &wait_timeout) :
218
 
             pthread_cond_wait(cond, &data->lock->mutex));
 
175
    boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
 
176
 
 
177
    if (can_deadlock)
 
178
    {
 
179
      boost::xtime xt; 
 
180
      xtime_get(&xt, boost::TIME_UTC); 
 
181
      xt.sec += table_lock_wait_timeout; 
 
182
      if (not cond->timed_wait(scoped, xt))
 
183
      {
 
184
        result= THR_LOCK_WAIT_TIMEOUT;
 
185
        scoped.release();
 
186
        break;
 
187
      }
 
188
    }
 
189
    else
 
190
    {
 
191
      cond->wait(scoped);
 
192
    }
219
193
    /*
220
194
      We must break the wait if one of the following occurs:
221
195
      - the connection has been aborted (!thread_var->abort), but
229
203
      Order of checks below is important to not report about timeout
230
204
      if the predicate is true.
231
205
    */
232
 
    if (data->cond == 0)
233
 
    {
234
 
      break;
235
 
    }
236
 
    if (rc == ETIMEDOUT || rc == ETIME)
237
 
    {
238
 
      result= THR_LOCK_WAIT_TIMEOUT;
239
 
      break;
240
 
    }
 
206
    if (data->cond == NULL)
 
207
    {
 
208
      scoped.release();
 
209
      break;
 
210
    }
 
211
    scoped.release();
241
212
  }
242
213
  if (data->cond || data->type == TL_UNLOCK)
243
214
  {
254
225
  else
255
226
  {
256
227
    result= THR_LOCK_SUCCESS;
257
 
    if (data->lock->get_status)
258
 
      (*data->lock->get_status)(data->status_param, 0);
259
228
  }
260
 
  pthread_mutex_unlock(&data->lock->mutex);
 
229
  data->lock->unlock();
261
230
 
262
231
  /* The following must be done after unlock of lock->mutex */
263
 
  pthread_mutex_lock(&thread_var->mutex);
 
232
  boost_unique_lock_t scopedLock(thread_var->mutex);
264
233
  thread_var->current_mutex= NULL;
265
234
  thread_var->current_cond= NULL;
266
 
  pthread_mutex_unlock(&thread_var->mutex);
267
235
  return(result);
268
236
}
269
237
 
270
238
 
271
 
static enum enum_thr_lock_result
272
 
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
273
 
         enum thr_lock_type lock_type)
 
239
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
274
240
{
275
 
  THR_LOCK *lock=data->lock;
 
241
  THR_LOCK *lock= data->lock;
276
242
  enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
277
243
  struct st_lock_list *wait_queue;
278
244
  THR_LOCK_DATA *lock_owner;
281
247
  data->cond=0;                                 /* safety */
282
248
  data->type=lock_type;
283
249
  data->owner= owner;                           /* Must be reset ! */
284
 
  pthread_mutex_lock(&lock->mutex);
 
250
  lock->lock();
285
251
  if ((int) lock_type <= (int) TL_READ_NO_INSERT)
286
252
  {
287
253
    /* Request for READ lock */
307
273
        lock->read.last= &data->next;
308
274
        if (lock_type == TL_READ_NO_INSERT)
309
275
          lock->read_no_write_count++;
310
 
        if (lock->get_status)
311
 
          (*lock->get_status)(data->status_param, 0);
312
 
        statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
 
276
        current_global_counters.locks_immediate++;
313
277
        goto end;
314
278
      }
315
279
      if (lock->write.data->type == TL_WRITE_ONLY)
327
291
      (*lock->read.last)=data;                  /* Add to running FIFO */
328
292
      data->prev=lock->read.last;
329
293
      lock->read.last= &data->next;
330
 
      if (lock->get_status)
331
 
        (*lock->get_status)(data->status_param, 0);
332
294
      if (lock_type == TL_READ_NO_INSERT)
333
295
        lock->read_no_write_count++;
334
 
      statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
 
296
      current_global_counters.locks_immediate++;
335
297
      goto end;
336
298
    }
337
299
    /*
343
305
  }
344
306
  else                                          /* Request for WRITE lock */
345
307
  {
346
 
    if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
 
308
    if (lock_type == TL_WRITE_CONCURRENT_INSERT)
347
309
      data->type=lock_type= thr_upgraded_concurrent_insert_lock;
348
310
 
349
311
    if (lock->write.data)                       /* If there is a write lock */
378
340
        (*lock->write.last)=data;       /* Add to running fifo */
379
341
        data->prev=lock->write.last;
380
342
        lock->write.last= &data->next;
381
 
        if (data->lock->get_status)
382
 
          (*data->lock->get_status)(data->status_param, 0);
383
 
        statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
 
343
        current_global_counters.locks_immediate++;
384
344
        goto end;
385
345
      }
386
346
    }
392
352
        if (lock_type == TL_WRITE_CONCURRENT_INSERT)
393
353
        {
394
354
          concurrent_insert= 1;
395
 
          if ((*lock->check_status)(data->status_param))
396
 
          {
397
 
            concurrent_insert= 0;
398
 
            data->type=lock_type= thr_upgraded_concurrent_insert_lock;
399
 
          }
400
355
        }
401
356
 
402
357
        if (!lock->read.data ||
408
363
          (*lock->write.last)=data;             /* Add as current write lock */
409
364
          data->prev=lock->write.last;
410
365
          lock->write.last= &data->next;
411
 
          if (data->lock->get_status)
412
 
            (*data->lock->get_status)(data->status_param, concurrent_insert);
413
 
          statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
 
366
          current_global_counters.locks_immediate++;
414
367
          goto end;
415
368
        }
416
369
      }
429
382
    result= THR_LOCK_DEADLOCK;
430
383
    goto end;
431
384
  }
 
385
 
432
386
  /* Can't get lock yet;  Wait for it */
433
 
  return(wait_for_lock(wait_queue, data, 0));
 
387
  return(wait_for_lock(session, wait_queue, data));
434
388
end:
435
 
  pthread_mutex_unlock(&lock->mutex);
 
389
  lock->unlock();
 
390
 
436
391
  return(result);
437
392
}
438
393
 
439
394
 
440
395
static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
441
396
{
442
 
  THR_LOCK_DATA *data=lock->read_wait.data;
 
397
  THR_LOCK_DATA *data= lock->read_wait.data;
443
398
 
444
399
  /* move all locks from read_wait list to read list */
445
400
  (*lock->read.last)=data;
451
406
 
452
407
  do
453
408
  {
454
 
    pthread_cond_t *cond=data->cond;
 
409
    boost::condition_variable_any *cond= data->cond;
455
410
    if ((int) data->type == (int) TL_READ_NO_INSERT)
456
411
    {
457
412
      if (using_concurrent_insert)
471
426
      }
472
427
      lock->read_no_write_count++;
473
428
    }
474
 
    data->cond=0;                               /* Mark thread free */
475
 
    pthread_cond_signal(cond);
 
429
    data->cond= NULL;                           /* Mark thread free */
 
430
    cond->notify_one();
476
431
  } while ((data=data->next));
477
432
  *lock->read_wait.last=0;
478
433
  if (!lock->read_wait.data)
479
434
    lock->write_lock_count=0;
480
435
}
481
436
 
482
 
        /* Unlock lock and free next thread on same lock */
 
437
/* Unlock lock and free next thread on same lock */
483
438
 
484
439
static void thr_unlock(THR_LOCK_DATA *data)
485
440
{
486
441
  THR_LOCK *lock=data->lock;
487
442
  enum thr_lock_type lock_type=data->type;
488
 
  pthread_mutex_lock(&lock->mutex);
 
443
  lock->lock();
489
444
 
490
445
  if (((*data->prev)=data->next))               /* remove from lock-list */
491
446
    data->next->prev= data->prev;
494
449
  else
495
450
    lock->write.last=data->prev;
496
451
  if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
497
 
  {
498
 
    if (lock->update_status)
499
 
      (*lock->update_status)(data->status_param);
500
 
  }
 
452
  { }
501
453
  else
502
 
  {
503
 
    if (lock->restore_status)
504
 
      (*lock->restore_status)(data->status_param);
505
 
  }
 
454
  { }
506
455
  if (lock_type == TL_READ_NO_INSERT)
507
456
    lock->read_no_write_count--;
508
457
  data->type=TL_UNLOCK;                         /* Mark unlocked */
509
458
  wake_up_waiters(lock);
510
 
  pthread_mutex_unlock(&lock->mutex);
511
 
  return;
 
459
  lock->unlock();
512
460
}
513
461
 
514
462
 
554
502
          data->prev=lock->write.last;
555
503
          data->next=0;
556
504
          lock->write.last= &data->next;
557
 
          if (data->type == TL_WRITE_CONCURRENT_INSERT &&
558
 
              (*lock->check_status)(data->status_param))
559
 
            data->type=TL_WRITE;                        /* Upgrade lock */
 
505
 
560
506
          {
561
 
            pthread_cond_t *cond=data->cond;
562
 
            data->cond=0;                               /* Mark thread free */
563
 
            pthread_cond_signal(cond);  /* Start waiting thread */
 
507
            boost::condition_variable_any *cond= data->cond;
 
508
            data->cond= NULL;                           /* Mark thread free */
 
509
            cond->notify_one(); /* Start waiting thred */
564
510
          }
565
511
          if (data->type != TL_WRITE_ALLOW_WRITE ||
566
512
              !lock->write_wait.data ||
584
530
               lock_type != TL_WRITE_ALLOW_WRITE) ||
585
531
              !lock->read_no_write_count))
586
532
    {
587
 
      /*
588
 
        For DELAYED, ALLOW_READ, WRITE_ALLOW_WRITE or CONCURRENT_INSERT locks
589
 
        start WRITE locks together with the READ locks
590
 
      */
591
 
      if (lock_type == TL_WRITE_CONCURRENT_INSERT &&
592
 
          (*lock->check_status)(data->status_param))
593
 
      {
594
 
        data->type=TL_WRITE;                    /* Upgrade lock */
595
 
        if (lock->read_wait.data)
596
 
          free_all_read_locks(lock,0);
597
 
        goto end;
598
 
      }
599
533
      do {
600
 
        pthread_cond_t *cond=data->cond;
 
534
        boost::condition_variable_any *cond= data->cond;
601
535
        if (((*data->prev)=data->next))         /* remove from wait-list */
602
536
          data->next->prev= data->prev;
603
537
        else
606
540
        data->prev=lock->write.last;
607
541
        lock->write.last= &data->next;
608
542
        data->next=0;                           /* Only one write lock */
609
 
        data->cond=0;                           /* Mark thread free */
610
 
        pthread_cond_signal(cond);      /* Start waiting thread */
 
543
        data->cond= NULL;                               /* Mark thread free */
 
544
        cond->notify_one(); /* Start waiting thread */
611
545
      } while (lock_type == TL_WRITE_ALLOW_WRITE &&
612
546
               (data=lock->write_wait.data) &&
613
547
               data->type == TL_WRITE_ALLOW_WRITE);
617
551
                             lock_type == TL_WRITE_ALLOW_WRITE));
618
552
    }
619
553
    else if (!data && lock->read_wait.data)
 
554
    {
620
555
      free_all_read_locks(lock,0);
 
556
    }
621
557
  }
622
558
end:
623
559
  return;
655
591
 
656
592
 
657
593
enum enum_thr_lock_result
658
 
thr_multi_lock(THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
 
594
thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
659
595
{
660
596
  THR_LOCK_DATA **pos,**end;
661
597
  if (count > 1)
663
599
  /* lock everything */
664
600
  for (pos=data,end=data+count; pos < end ; pos++)
665
601
  {
666
 
    enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
 
602
    enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
667
603
    if (result != THR_LOCK_SUCCESS)
668
604
    {                                           /* Aborted */
669
605
      thr_multi_unlock(data,(uint32_t) (pos-data));
683
619
    do
684
620
    {
685
621
      pos--;
686
 
      if (last_lock->lock == (*pos)->lock &&
687
 
          last_lock->lock->copy_status)
688
 
      {
689
 
        if (last_lock->type <= TL_READ_NO_INSERT)
690
 
        {
691
 
          THR_LOCK_DATA **read_lock;
692
 
          /*
693
 
            If we are locking the same table with read locks we must ensure
694
 
            that all tables share the status of the last write lock or
695
 
            the same read lock.
696
 
          */
697
 
          for (;
698
 
               (*pos)->type <= TL_READ_NO_INSERT &&
699
 
                 pos != data &&
700
 
                 pos[-1]->lock == (*pos)->lock ;
701
 
               pos--) ;
702
 
 
703
 
          read_lock = pos+1;
704
 
          do
705
 
          {
706
 
            (last_lock->lock->copy_status)((*read_lock)->status_param,
707
 
                                           (*pos)->status_param);
708
 
          } while (*(read_lock++) != last_lock);
709
 
          last_lock= (*pos);                    /* Point at last write lock */
710
 
        }
711
 
        else
712
 
          (*last_lock->lock->copy_status)((*pos)->status_param,
713
 
                                          last_lock->status_param);
714
 
      }
715
 
      else
716
 
        last_lock=(*pos);
 
622
      last_lock=(*pos);
717
623
    } while (pos != data);
718
624
  }
719
625
#endif
734
640
  return;
735
641
}
736
642
 
 
643
void DrizzleLock::unlock(uint32_t count)
 
644
{
 
645
  THR_LOCK_DATA **pos,**end;
 
646
 
 
647
  for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
 
648
  {
 
649
    if ((*pos)->type != TL_UNLOCK)
 
650
      thr_unlock(*pos);
 
651
  }
 
652
}
 
653
 
737
654
/*
738
655
  Abort all threads waiting for a lock. The lock will be upgraded to
739
656
  TL_WRITE_ONLY to abort any new accesses to the lock
740
657
*/
741
658
 
742
 
void thr_abort_locks(THR_LOCK *lock)
 
659
void THR_LOCK::abort_locks()
743
660
{
744
 
  THR_LOCK_DATA *data;
745
 
  pthread_mutex_lock(&lock->mutex);
 
661
  boost_unique_lock_t scopedLock(mutex);
746
662
 
747
 
  for (data=lock->read_wait.data; data ; data=data->next)
 
663
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
748
664
  {
749
 
    data->type= TL_UNLOCK;                      /* Mark killed */
 
665
    local_data->type= TL_UNLOCK;                        /* Mark killed */
750
666
    /* It's safe to signal the cond first: we're still holding the mutex. */
751
 
    pthread_cond_signal(data->cond);
752
 
    data->cond= NULL;                           /* Removed from list */
 
667
    local_data->cond->notify_one();
 
668
    local_data->cond= NULL;                             /* Removed from list */
753
669
  }
754
 
  for (data=lock->write_wait.data; data ; data=data->next)
 
670
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
755
671
  {
756
 
    data->type=TL_UNLOCK;
757
 
    pthread_cond_signal(data->cond);
758
 
    data->cond= NULL;
 
672
    local_data->type= TL_UNLOCK;
 
673
    local_data->cond->notify_one();
 
674
    local_data->cond= NULL;
759
675
  }
760
 
  lock->read_wait.last= &lock->read_wait.data;
761
 
  lock->write_wait.last= &lock->write_wait.data;
762
 
  lock->read_wait.data=lock->write_wait.data=0;
763
 
  if (lock->write.data)
764
 
    lock->write.data->type=TL_WRITE_ONLY;
765
 
  pthread_mutex_unlock(&lock->mutex);
766
 
  return;
 
676
  read_wait.last= &read_wait.data;
 
677
  write_wait.last= &write_wait.data;
 
678
  read_wait.data= write_wait.data=0;
 
679
  if (write.data)
 
680
    write.data->type=TL_WRITE_ONLY;
767
681
}
768
682
 
769
683
 
773
687
  This is used to abort all locks for a specific thread
774
688
*/
775
689
 
776
 
bool thr_abort_locks_for_thread(THR_LOCK *lock, uint64_t thread_id)
 
690
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
777
691
{
778
 
  THR_LOCK_DATA *data;
779
692
  bool found= false;
780
693
 
781
 
  pthread_mutex_lock(&lock->mutex);
782
 
  for (data= lock->read_wait.data; data ; data= data->next)
 
694
  boost_unique_lock_t scopedLock(mutex);
 
695
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
783
696
  {
784
 
    if (data->owner->info->thread_id == thread_id)
 
697
    if (local_data->owner->info->thread_id == thread_id_arg)
785
698
    {
786
 
      data->type= TL_UNLOCK;                    /* Mark killed */
 
699
      local_data->type= TL_UNLOCK;                      /* Mark killed */
787
700
      /* It's safe to signal the cond first: we're still holding the mutex. */
788
701
      found= true;
789
 
      pthread_cond_signal(data->cond);
790
 
      data->cond= 0;                            /* Removed from list */
 
702
      local_data->cond->notify_one();
 
703
      local_data->cond= 0;                              /* Removed from list */
791
704
 
792
 
      if (((*data->prev)= data->next))
793
 
        data->next->prev= data->prev;
 
705
      if (((*local_data->prev)= local_data->next))
 
706
        local_data->next->prev= local_data->prev;
794
707
      else
795
 
        lock->read_wait.last= data->prev;
 
708
        read_wait.last= local_data->prev;
796
709
    }
797
710
  }
798
 
  for (data= lock->write_wait.data; data ; data= data->next)
 
711
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
799
712
  {
800
 
    if (data->owner->info->thread_id == thread_id)
 
713
    if (local_data->owner->info->thread_id == thread_id_arg)
801
714
    {
802
 
      data->type= TL_UNLOCK;
 
715
      local_data->type= TL_UNLOCK;
803
716
      found= true;
804
 
      pthread_cond_signal(data->cond);
805
 
      data->cond= NULL;
 
717
      local_data->cond->notify_one();
 
718
      local_data->cond= NULL;
806
719
 
807
 
      if (((*data->prev)= data->next))
808
 
        data->next->prev= data->prev;
 
720
      if (((*local_data->prev)= local_data->next))
 
721
        local_data->next->prev= local_data->prev;
809
722
      else
810
 
        lock->write_wait.last= data->prev;
 
723
        write_wait.last= local_data->prev;
811
724
    }
812
725
  }
813
 
  wake_up_waiters(lock);
814
 
  pthread_mutex_unlock(&lock->mutex);
815
 
  return(found);
 
726
  wake_up_waiters(this);
 
727
 
 
728
  return found;
816
729
}
817
730
 
818
731
} /* namespace drizzled */