~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/thr_lock.cc

  • Committer: Monty Taylor
  • Date: 2010-07-09 14:06:50 UTC
  • Revision ID: mordred@inaugust.com-20100709140650-ojeih829v3wbdkyv
Added include guard for generator.h to make cpplint happy.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
Read and write locks for Posix threads. All tread must acquire
46
46
should put a pointer to the following functions in the lock structure:
47
47
(If the pointer is zero (default), the function is not called)
48
48
 
 
49
check_status:
 
50
         Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
 
51
         we check if this function exists and returns 0.
 
52
         If not, then the lock is upgraded to TL_WRITE_LOCK
 
53
         In MyISAM this is a simple check if the insert can be done
 
54
         at the end of the datafile.
 
55
update_status:
 
56
        Before a write lock is released, this function is called.
 
57
        In MyISAM this functions updates the count and length of the datafile
 
58
get_status:
 
59
        When one gets a lock this functions is called.
 
60
        In MyISAM this stores the number of rows and size of the datafile
 
61
        for concurrent reads.
49
62
 
50
63
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
51
64
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
52
65
 
53
66
*/
54
67
 
55
 
#include <config.h>
56
 
#include <drizzled/internal/my_sys.h>
57
 
#include <drizzled/internal/thread_var.h>
58
 
#include <drizzled/statistics_variables.h>
59
 
#include <drizzled/pthread_globals.h>
60
 
 
61
 
#include <drizzled/session.h>
 
68
#include "config.h"
 
69
#include "drizzled/internal/my_sys.h"
 
70
#include "drizzled/statistics_variables.h"
62
71
 
63
72
#include "thr_lock.h"
64
 
#include <drizzled/internal/m_string.h>
 
73
#include "drizzled/internal/m_string.h"
65
74
#include <errno.h>
66
75
#include <list>
67
76
 
78
87
 
79
88
#include <drizzled/util/test.h>
80
89
 
81
 
#include <boost/interprocess/sync/lock_options.hpp>
82
 
 
83
90
using namespace std;
84
91
 
85
92
namespace drizzled
86
93
{
87
94
 
 
95
bool thr_lock_inited= false;
88
96
uint64_t table_lock_wait_timeout;
89
97
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
90
98
 
91
99
 
92
 
uint64_t max_write_lock_count= UINT64_MAX;
 
100
static list<THR_LOCK *> thr_lock_thread_list;          /* List of threads in use */
 
101
 
 
102
uint64_t max_write_lock_count= ~(uint64_t) 0L;
 
103
 
 
104
static inline pthread_cond_t *get_cond(void)
 
105
{
 
106
  return &my_thread_var->suspend;
 
107
}
93
108
 
94
109
/*
95
110
** For the future (now the thread specific cond is alloced by my_pthread.c)
96
111
*/
97
112
 
 
113
bool init_thr_lock()
 
114
{
 
115
  thr_lock_inited= true;
 
116
 
 
117
  return false;
 
118
}
 
119
 
98
120
static inline bool
99
121
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
100
122
{
106
128
 
107
129
void thr_lock_init(THR_LOCK *lock)
108
130
{
109
 
  lock->init();
 
131
  memset(lock, 0, sizeof(*lock));
 
132
  pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
110
133
  lock->read.last= &lock->read.data;
111
134
  lock->read_wait.last= &lock->read_wait.data;
112
135
  lock->write_wait.last= &lock->write_wait.data;
113
136
  lock->write.last= &lock->write.data;
114
 
}
115
 
 
116
 
 
117
 
void THR_LOCK_INFO::init()
 
137
 
 
138
  pthread_mutex_lock(&internal::THR_LOCK_lock);         /* Add to locks in use */
 
139
  thr_lock_thread_list.push_front(lock);
 
140
  pthread_mutex_unlock(&internal::THR_LOCK_lock);
 
141
}
 
142
 
 
143
 
 
144
void thr_lock_delete(THR_LOCK *lock)
 
145
{
 
146
  pthread_mutex_destroy(&lock->mutex);
 
147
  pthread_mutex_lock(&internal::THR_LOCK_lock);
 
148
  thr_lock_thread_list.remove(lock);
 
149
  pthread_mutex_unlock(&internal::THR_LOCK_lock);
 
150
}
 
151
 
 
152
 
 
153
void thr_lock_info_init(THR_LOCK_INFO *info)
118
154
{
119
155
  internal::st_my_thread_var *tmp= my_thread_var;
120
 
  thread_id= tmp->id;
121
 
  n_cursors= 0;
 
156
  info->thread= tmp->pthread_self;
 
157
  info->thread_id= tmp->id;
 
158
  info->n_cursors= 0;
122
159
}
123
160
 
124
161
        /* Initialize a lock instance */
125
162
 
126
 
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
 
163
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
127
164
{
128
 
  lock= lock_arg;
129
 
  type= TL_UNLOCK;
130
 
  owner= NULL;                               /* no owner yet */
131
 
  status_param= param_arg;
132
 
  cond= NULL;
 
165
  data->lock= lock;
 
166
  data->type= TL_UNLOCK;
 
167
  data->owner= NULL;                               /* no owner yet */
 
168
  data->status_param= param;
 
169
  data->cond= NULL;
133
170
}
134
171
 
135
172
 
147
184
static void wake_up_waiters(THR_LOCK *lock);
148
185
 
149
186
 
150
 
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
 
187
static enum enum_thr_lock_result
 
188
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
 
189
              bool in_wait_list)
151
190
{
152
 
  internal::st_my_thread_var *thread_var= session.getThreadVar();
153
 
 
154
 
  boost::condition_variable_any *cond= &thread_var->suspend;
 
191
  internal::st_my_thread_var *thread_var= my_thread_var;
 
192
  pthread_cond_t *cond= &thread_var->suspend;
 
193
  struct timespec wait_timeout;
155
194
  enum enum_thr_lock_result result= THR_LOCK_ABORTED;
156
195
  bool can_deadlock= test(data->owner->info->n_cursors);
157
196
 
 
197
  if (!in_wait_list)
158
198
  {
159
199
    (*wait->last)=data;                         /* Wait for lock */
160
200
    data->prev= wait->last;
161
201
    wait->last= &data->next;
162
202
  }
163
203
 
164
 
  current_global_counters.locks_waited++;
 
204
  status_var_increment(current_global_counters.locks_waited);
165
205
 
166
206
  /* Set up control struct to allow others to abort locks */
167
 
  thread_var->current_mutex= data->lock->native_handle();
168
 
  thread_var->current_cond=  &thread_var->suspend;
169
 
  data->cond= &thread_var->suspend;;
 
207
  thread_var->current_mutex= &data->lock->mutex;
 
208
  thread_var->current_cond=  cond;
 
209
  data->cond= cond;
170
210
 
171
 
  while (not thread_var->abort)
 
211
  if (can_deadlock)
 
212
    set_timespec(wait_timeout, table_lock_wait_timeout);
 
213
  while (!thread_var->abort || in_wait_list)
172
214
  {
173
 
    boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
174
 
 
175
 
    if (can_deadlock)
176
 
    {
177
 
      boost::xtime xt; 
178
 
      xtime_get(&xt, boost::TIME_UTC); 
179
 
      xt.sec += table_lock_wait_timeout; 
180
 
      if (not cond->timed_wait(scoped, xt))
181
 
      {
182
 
        result= THR_LOCK_WAIT_TIMEOUT;
183
 
        scoped.release();
184
 
        break;
185
 
      }
186
 
    }
187
 
    else
188
 
    {
189
 
      cond->wait(scoped);
190
 
    }
 
215
    int rc= (can_deadlock ?
 
216
             pthread_cond_timedwait(cond, &data->lock->mutex,
 
217
                                    &wait_timeout) :
 
218
             pthread_cond_wait(cond, &data->lock->mutex));
191
219
    /*
192
220
      We must break the wait if one of the following occurs:
193
221
      - the connection has been aborted (!thread_var->abort), but
201
229
      Order of checks below is important to not report about timeout
202
230
      if the predicate is true.
203
231
    */
204
 
    if (data->cond == NULL)
205
 
    {
206
 
      scoped.release();
207
 
      break;
208
 
    }
209
 
    scoped.release();
 
232
    if (data->cond == 0)
 
233
    {
 
234
      break;
 
235
    }
 
236
    if (rc == ETIMEDOUT || rc == ETIME)
 
237
    {
 
238
      result= THR_LOCK_WAIT_TIMEOUT;
 
239
      break;
 
240
    }
210
241
  }
211
242
  if (data->cond || data->type == TL_UNLOCK)
212
243
  {
223
254
  else
224
255
  {
225
256
    result= THR_LOCK_SUCCESS;
 
257
    if (data->lock->get_status)
 
258
      (*data->lock->get_status)(data->status_param, 0);
226
259
  }
227
 
  data->lock->unlock();
 
260
  pthread_mutex_unlock(&data->lock->mutex);
228
261
 
229
262
  /* The following must be done after unlock of lock->mutex */
230
 
  boost_unique_lock_t scopedLock(thread_var->mutex);
 
263
  pthread_mutex_lock(&thread_var->mutex);
231
264
  thread_var->current_mutex= NULL;
232
265
  thread_var->current_cond= NULL;
 
266
  pthread_mutex_unlock(&thread_var->mutex);
233
267
  return(result);
234
268
}
235
269
 
236
270
 
237
 
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
 
271
static enum enum_thr_lock_result
 
272
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
 
273
         enum thr_lock_type lock_type)
238
274
{
239
 
  THR_LOCK *lock= data->lock;
 
275
  THR_LOCK *lock=data->lock;
240
276
  enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
241
277
  struct st_lock_list *wait_queue;
242
278
  THR_LOCK_DATA *lock_owner;
245
281
  data->cond=0;                                 /* safety */
246
282
  data->type=lock_type;
247
283
  data->owner= owner;                           /* Must be reset ! */
248
 
  lock->lock();
 
284
  pthread_mutex_lock(&lock->mutex);
249
285
  if ((int) lock_type <= (int) TL_READ_NO_INSERT)
250
286
  {
251
287
    /* Request for READ lock */
271
307
        lock->read.last= &data->next;
272
308
        if (lock_type == TL_READ_NO_INSERT)
273
309
          lock->read_no_write_count++;
274
 
        current_global_counters.locks_immediate++;
 
310
        if (lock->get_status)
 
311
          (*lock->get_status)(data->status_param, 0);
 
312
        status_var_increment(current_global_counters.locks_immediate);
275
313
        goto end;
276
314
      }
277
315
      if (lock->write.data->type == TL_WRITE_ONLY)
289
327
      (*lock->read.last)=data;                  /* Add to running FIFO */
290
328
      data->prev=lock->read.last;
291
329
      lock->read.last= &data->next;
 
330
      if (lock->get_status)
 
331
        (*lock->get_status)(data->status_param, 0);
292
332
      if (lock_type == TL_READ_NO_INSERT)
293
333
        lock->read_no_write_count++;
294
 
      current_global_counters.locks_immediate++;
 
334
      status_var_increment(current_global_counters.locks_immediate);
295
335
      goto end;
296
336
    }
297
337
    /*
303
343
  }
304
344
  else                                          /* Request for WRITE lock */
305
345
  {
306
 
    if (lock_type == TL_WRITE_CONCURRENT_INSERT)
 
346
    if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
307
347
      data->type=lock_type= thr_upgraded_concurrent_insert_lock;
308
348
 
309
349
    if (lock->write.data)                       /* If there is a write lock */
338
378
        (*lock->write.last)=data;       /* Add to running fifo */
339
379
        data->prev=lock->write.last;
340
380
        lock->write.last= &data->next;
341
 
        current_global_counters.locks_immediate++;
 
381
        if (data->lock->get_status)
 
382
          (*data->lock->get_status)(data->status_param, 0);
 
383
        status_var_increment(current_global_counters.locks_immediate);
342
384
        goto end;
343
385
      }
344
386
    }
350
392
        if (lock_type == TL_WRITE_CONCURRENT_INSERT)
351
393
        {
352
394
          concurrent_insert= 1;
 
395
          if ((*lock->check_status)(data->status_param))
 
396
          {
 
397
            concurrent_insert= 0;
 
398
            data->type=lock_type= thr_upgraded_concurrent_insert_lock;
 
399
          }
353
400
        }
354
401
 
355
402
        if (!lock->read.data ||
361
408
          (*lock->write.last)=data;             /* Add as current write lock */
362
409
          data->prev=lock->write.last;
363
410
          lock->write.last= &data->next;
364
 
          current_global_counters.locks_immediate++;
 
411
          if (data->lock->get_status)
 
412
            (*data->lock->get_status)(data->status_param, concurrent_insert);
 
413
          status_var_increment(current_global_counters.locks_immediate);
365
414
          goto end;
366
415
        }
367
416
      }
380
429
    result= THR_LOCK_DEADLOCK;
381
430
    goto end;
382
431
  }
383
 
 
384
432
  /* Can't get lock yet;  Wait for it */
385
 
  return(wait_for_lock(session, wait_queue, data));
 
433
  return(wait_for_lock(wait_queue, data, 0));
386
434
end:
387
 
  lock->unlock();
388
 
 
 
435
  pthread_mutex_unlock(&lock->mutex);
389
436
  return(result);
390
437
}
391
438
 
392
439
 
393
440
static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
394
441
{
395
 
  THR_LOCK_DATA *data= lock->read_wait.data;
 
442
  THR_LOCK_DATA *data=lock->read_wait.data;
396
443
 
397
444
  /* move all locks from read_wait list to read list */
398
445
  (*lock->read.last)=data;
404
451
 
405
452
  do
406
453
  {
407
 
    boost::condition_variable_any *cond= data->cond;
 
454
    pthread_cond_t *cond=data->cond;
408
455
    if ((int) data->type == (int) TL_READ_NO_INSERT)
409
456
    {
410
457
      if (using_concurrent_insert)
424
471
      }
425
472
      lock->read_no_write_count++;
426
473
    }
427
 
    data->cond= NULL;                           /* Mark thread free */
428
 
    cond->notify_one();
 
474
    data->cond=0;                               /* Mark thread free */
 
475
    pthread_cond_signal(cond);
429
476
  } while ((data=data->next));
430
477
  *lock->read_wait.last=0;
431
478
  if (!lock->read_wait.data)
432
479
    lock->write_lock_count=0;
433
480
}
434
481
 
435
 
/* Unlock lock and free next thread on same lock */
 
482
        /* Unlock lock and free next thread on same lock */
436
483
 
437
484
static void thr_unlock(THR_LOCK_DATA *data)
438
485
{
439
486
  THR_LOCK *lock=data->lock;
440
487
  enum thr_lock_type lock_type=data->type;
441
 
  lock->lock();
 
488
  pthread_mutex_lock(&lock->mutex);
442
489
 
443
490
  if (((*data->prev)=data->next))               /* remove from lock-list */
444
491
    data->next->prev= data->prev;
447
494
  else
448
495
    lock->write.last=data->prev;
449
496
  if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
450
 
  { }
 
497
  {
 
498
    if (lock->update_status)
 
499
      (*lock->update_status)(data->status_param);
 
500
  }
451
501
  else
452
 
  { }
 
502
  {
 
503
    if (lock->restore_status)
 
504
      (*lock->restore_status)(data->status_param);
 
505
  }
453
506
  if (lock_type == TL_READ_NO_INSERT)
454
507
    lock->read_no_write_count--;
455
508
  data->type=TL_UNLOCK;                         /* Mark unlocked */
456
509
  wake_up_waiters(lock);
457
 
  lock->unlock();
 
510
  pthread_mutex_unlock(&lock->mutex);
 
511
  return;
458
512
}
459
513
 
460
514
 
500
554
          data->prev=lock->write.last;
501
555
          data->next=0;
502
556
          lock->write.last= &data->next;
503
 
 
 
557
          if (data->type == TL_WRITE_CONCURRENT_INSERT &&
 
558
              (*lock->check_status)(data->status_param))
 
559
            data->type=TL_WRITE;                        /* Upgrade lock */
504
560
          {
505
 
            boost::condition_variable_any *cond= data->cond;
506
 
            data->cond= NULL;                           /* Mark thread free */
507
 
            cond->notify_one(); /* Start waiting thred */
 
561
            pthread_cond_t *cond=data->cond;
 
562
            data->cond=0;                               /* Mark thread free */
 
563
            pthread_cond_signal(cond);  /* Start waiting thread */
508
564
          }
509
565
          if (data->type != TL_WRITE_ALLOW_WRITE ||
510
566
              !lock->write_wait.data ||
528
584
               lock_type != TL_WRITE_ALLOW_WRITE) ||
529
585
              !lock->read_no_write_count))
530
586
    {
 
587
      /*
 
588
        For DELAYED, ALLOW_READ, WRITE_ALLOW_WRITE or CONCURRENT_INSERT locks
 
589
        start WRITE locks together with the READ locks
 
590
      */
 
591
      if (lock_type == TL_WRITE_CONCURRENT_INSERT &&
 
592
          (*lock->check_status)(data->status_param))
 
593
      {
 
594
        data->type=TL_WRITE;                    /* Upgrade lock */
 
595
        if (lock->read_wait.data)
 
596
          free_all_read_locks(lock,0);
 
597
        goto end;
 
598
      }
531
599
      do {
532
 
        boost::condition_variable_any *cond= data->cond;
 
600
        pthread_cond_t *cond=data->cond;
533
601
        if (((*data->prev)=data->next))         /* remove from wait-list */
534
602
          data->next->prev= data->prev;
535
603
        else
538
606
        data->prev=lock->write.last;
539
607
        lock->write.last= &data->next;
540
608
        data->next=0;                           /* Only one write lock */
541
 
        data->cond= NULL;                               /* Mark thread free */
542
 
        cond->notify_one(); /* Start waiting thread */
 
609
        data->cond=0;                           /* Mark thread free */
 
610
        pthread_cond_signal(cond);      /* Start waiting thread */
543
611
      } while (lock_type == TL_WRITE_ALLOW_WRITE &&
544
612
               (data=lock->write_wait.data) &&
545
613
               data->type == TL_WRITE_ALLOW_WRITE);
549
617
                             lock_type == TL_WRITE_ALLOW_WRITE));
550
618
    }
551
619
    else if (!data && lock->read_wait.data)
552
 
    {
553
620
      free_all_read_locks(lock,0);
554
 
    }
555
621
  }
556
622
end:
557
623
  return;
589
655
 
590
656
 
591
657
enum enum_thr_lock_result
592
 
thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
 
658
thr_multi_lock(THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
593
659
{
594
660
  THR_LOCK_DATA **pos,**end;
595
661
  if (count > 1)
597
663
  /* lock everything */
598
664
  for (pos=data,end=data+count; pos < end ; pos++)
599
665
  {
600
 
    enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
 
666
    enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
601
667
    if (result != THR_LOCK_SUCCESS)
602
668
    {                                           /* Aborted */
603
669
      thr_multi_unlock(data,(uint32_t) (pos-data));
617
683
    do
618
684
    {
619
685
      pos--;
620
 
      last_lock=(*pos);
 
686
      if (last_lock->lock == (*pos)->lock &&
 
687
          last_lock->lock->copy_status)
 
688
      {
 
689
        if (last_lock->type <= TL_READ_NO_INSERT)
 
690
        {
 
691
          THR_LOCK_DATA **read_lock;
 
692
          /*
 
693
            If we are locking the same table with read locks we must ensure
 
694
            that all tables share the status of the last write lock or
 
695
            the same read lock.
 
696
          */
 
697
          for (;
 
698
               (*pos)->type <= TL_READ_NO_INSERT &&
 
699
                 pos != data &&
 
700
                 pos[-1]->lock == (*pos)->lock ;
 
701
               pos--) ;
 
702
 
 
703
          read_lock = pos+1;
 
704
          do
 
705
          {
 
706
            (last_lock->lock->copy_status)((*read_lock)->status_param,
 
707
                                           (*pos)->status_param);
 
708
          } while (*(read_lock++) != last_lock);
 
709
          last_lock= (*pos);                    /* Point at last write lock */
 
710
        }
 
711
        else
 
712
          (*last_lock->lock->copy_status)((*pos)->status_param,
 
713
                                          last_lock->status_param);
 
714
      }
 
715
      else
 
716
        last_lock=(*pos);
621
717
    } while (pos != data);
622
718
  }
623
719
#endif
638
734
  return;
639
735
}
640
736
 
641
 
void DrizzleLock::unlock(uint32_t count)
642
 
{
643
 
  THR_LOCK_DATA **pos,**end;
644
 
 
645
 
  for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
646
 
  {
647
 
    if ((*pos)->type != TL_UNLOCK)
648
 
      thr_unlock(*pos);
649
 
  }
650
 
}
651
 
 
652
737
/*
653
738
  Abort all threads waiting for a lock. The lock will be upgraded to
654
739
  TL_WRITE_ONLY to abort any new accesses to the lock
655
740
*/
656
741
 
657
 
void THR_LOCK::abort_locks()
 
742
void thr_abort_locks(THR_LOCK *lock)
658
743
{
659
 
  boost_unique_lock_t scopedLock(mutex);
 
744
  THR_LOCK_DATA *data;
 
745
  pthread_mutex_lock(&lock->mutex);
660
746
 
661
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
747
  for (data=lock->read_wait.data; data ; data=data->next)
662
748
  {
663
 
    local_data->type= TL_UNLOCK;                        /* Mark killed */
 
749
    data->type= TL_UNLOCK;                      /* Mark killed */
664
750
    /* It's safe to signal the cond first: we're still holding the mutex. */
665
 
    local_data->cond->notify_one();
666
 
    local_data->cond= NULL;                             /* Removed from list */
 
751
    pthread_cond_signal(data->cond);
 
752
    data->cond= NULL;                           /* Removed from list */
667
753
  }
668
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
754
  for (data=lock->write_wait.data; data ; data=data->next)
669
755
  {
670
 
    local_data->type= TL_UNLOCK;
671
 
    local_data->cond->notify_one();
672
 
    local_data->cond= NULL;
 
756
    data->type=TL_UNLOCK;
 
757
    pthread_cond_signal(data->cond);
 
758
    data->cond= NULL;
673
759
  }
674
 
  read_wait.last= &read_wait.data;
675
 
  write_wait.last= &write_wait.data;
676
 
  read_wait.data= write_wait.data=0;
677
 
  if (write.data)
678
 
    write.data->type=TL_WRITE_ONLY;
 
760
  lock->read_wait.last= &lock->read_wait.data;
 
761
  lock->write_wait.last= &lock->write_wait.data;
 
762
  lock->read_wait.data=lock->write_wait.data=0;
 
763
  if (lock->write.data)
 
764
    lock->write.data->type=TL_WRITE_ONLY;
 
765
  pthread_mutex_unlock(&lock->mutex);
 
766
  return;
679
767
}
680
768
 
681
769
 
685
773
  This is used to abort all locks for a specific thread
686
774
*/
687
775
 
688
 
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
 
776
bool thr_abort_locks_for_thread(THR_LOCK *lock, uint64_t thread_id)
689
777
{
 
778
  THR_LOCK_DATA *data;
690
779
  bool found= false;
691
780
 
692
 
  boost_unique_lock_t scopedLock(mutex);
693
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
781
  pthread_mutex_lock(&lock->mutex);
 
782
  for (data= lock->read_wait.data; data ; data= data->next)
694
783
  {
695
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
784
    if (data->owner->info->thread_id == thread_id)
696
785
    {
697
 
      local_data->type= TL_UNLOCK;                      /* Mark killed */
 
786
      data->type= TL_UNLOCK;                    /* Mark killed */
698
787
      /* It's safe to signal the cond first: we're still holding the mutex. */
699
788
      found= true;
700
 
      local_data->cond->notify_one();
701
 
      local_data->cond= 0;                              /* Removed from list */
 
789
      pthread_cond_signal(data->cond);
 
790
      data->cond= 0;                            /* Removed from list */
702
791
 
703
 
      if (((*local_data->prev)= local_data->next))
704
 
        local_data->next->prev= local_data->prev;
 
792
      if (((*data->prev)= data->next))
 
793
        data->next->prev= data->prev;
705
794
      else
706
 
        read_wait.last= local_data->prev;
 
795
        lock->read_wait.last= data->prev;
707
796
    }
708
797
  }
709
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
798
  for (data= lock->write_wait.data; data ; data= data->next)
710
799
  {
711
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
800
    if (data->owner->info->thread_id == thread_id)
712
801
    {
713
 
      local_data->type= TL_UNLOCK;
 
802
      data->type= TL_UNLOCK;
714
803
      found= true;
715
 
      local_data->cond->notify_one();
716
 
      local_data->cond= NULL;
 
804
      pthread_cond_signal(data->cond);
 
805
      data->cond= NULL;
717
806
 
718
 
      if (((*local_data->prev)= local_data->next))
719
 
        local_data->next->prev= local_data->prev;
 
807
      if (((*data->prev)= data->next))
 
808
        data->next->prev= data->prev;
720
809
      else
721
 
        write_wait.last= local_data->prev;
 
810
        lock->write_wait.last= data->prev;
722
811
    }
723
812
  }
724
 
  wake_up_waiters(this);
725
 
 
726
 
  return found;
 
813
  wake_up_waiters(lock);
 
814
  pthread_mutex_unlock(&lock->mutex);
 
815
  return(found);
727
816
}
728
817
 
729
818
} /* namespace drizzled */