~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/thr_lock.cc

  • Committer: Stewart Smith
  • Date: 2010-03-18 12:01:34 UTC
  • mto: (1666.2.3 build)
  • mto: This revision was merged to the branch mainline in revision 1596.
  • Revision ID: stewart@flamingspork.com-20100318120134-45fdnsw8g3j6c7oy
move RAND() into a plugin

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
Read and write locks for Posix threads. All tread must acquire
46
46
should put a pointer to the following functions in the lock structure:
47
47
(If the pointer is zero (default), the function is not called)
48
48
 
 
49
check_status:
 
50
         Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
 
51
         we check if this function exists and returns 0.
 
52
         If not, then the lock is upgraded to TL_WRITE_LOCK
 
53
         In MyISAM this is a simple check if the insert can be done
 
54
         at the end of the datafile.
 
55
update_status:
 
56
        Before a write lock is released, this function is called.
 
57
        In MyISAM this functions updates the count and length of the datafile
 
58
get_status:
 
59
        When one gets a lock this functions is called.
 
60
        In MyISAM this stores the number of rows and size of the datafile
 
61
        for concurrent reads.
49
62
 
50
63
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
51
64
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
54
67
 
55
68
#include "config.h"
56
69
#include "drizzled/internal/my_sys.h"
57
 
#include "drizzled/internal/thread_var.h"
58
 
#include "drizzled/statistics_variables.h"
59
 
#include "drizzled/pthread_globals.h"
60
 
 
61
 
#include "drizzled/session.h"
62
70
 
63
71
#include "thr_lock.h"
64
72
#include "drizzled/internal/m_string.h"
78
86
 
79
87
#include <drizzled/util/test.h>
80
88
 
81
 
#include <boost/interprocess/sync/lock_options.hpp>
82
 
 
83
89
using namespace std;
84
90
 
85
91
namespace drizzled
86
92
{
87
93
 
 
94
bool thr_lock_inited= false;
 
95
uint32_t locks_immediate = 0L, locks_waited = 0L;
88
96
uint64_t table_lock_wait_timeout;
89
97
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
90
98
 
91
99
 
92
 
uint64_t max_write_lock_count= UINT64_MAX;
93
 
 
94
 
void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count);
 
100
static list<THR_LOCK *> thr_lock_thread_list;          /* List of threads in use */
 
101
 
 
102
uint64_t max_write_lock_count= ~(uint64_t) 0L;
 
103
 
 
104
static inline pthread_cond_t *get_cond(void)
 
105
{
 
106
  return &my_thread_var->suspend;
 
107
}
95
108
 
96
109
/*
97
110
** For the future (now the thread specific cond is alloced by my_pthread.c)
98
111
*/
99
112
 
 
113
bool init_thr_lock()
 
114
{
 
115
  thr_lock_inited= true;
 
116
 
 
117
  return false;
 
118
}
 
119
 
100
120
static inline bool
101
121
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
102
122
{
108
128
 
109
129
void thr_lock_init(THR_LOCK *lock)
110
130
{
111
 
  lock->init();
 
131
  memset(lock, 0, sizeof(*lock));
 
132
  pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
112
133
  lock->read.last= &lock->read.data;
113
134
  lock->read_wait.last= &lock->read_wait.data;
114
135
  lock->write_wait.last= &lock->write_wait.data;
115
136
  lock->write.last= &lock->write.data;
116
 
}
117
 
 
118
 
 
119
 
void THR_LOCK_INFO::init()
 
137
 
 
138
  pthread_mutex_lock(&internal::THR_LOCK_lock);         /* Add to locks in use */
 
139
  thr_lock_thread_list.push_front(lock);
 
140
  pthread_mutex_unlock(&internal::THR_LOCK_lock);
 
141
}
 
142
 
 
143
 
 
144
void thr_lock_delete(THR_LOCK *lock)
 
145
{
 
146
  pthread_mutex_destroy(&lock->mutex);
 
147
  pthread_mutex_lock(&internal::THR_LOCK_lock);
 
148
  thr_lock_thread_list.remove(lock);
 
149
  pthread_mutex_unlock(&internal::THR_LOCK_lock);
 
150
}
 
151
 
 
152
 
 
153
void thr_lock_info_init(THR_LOCK_INFO *info)
120
154
{
121
155
  internal::st_my_thread_var *tmp= my_thread_var;
122
 
  thread_id= tmp->id;
123
 
  n_cursors= 0;
 
156
  info->thread= tmp->pthread_self;
 
157
  info->thread_id= tmp->id;
 
158
  info->n_cursors= 0;
124
159
}
125
160
 
126
161
        /* Initialize a lock instance */
127
162
 
128
 
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
 
163
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
129
164
{
130
 
  lock= lock_arg;
131
 
  type= TL_UNLOCK;
132
 
  owner= NULL;                               /* no owner yet */
133
 
  status_param= param_arg;
134
 
  cond= NULL;
 
165
  data->lock= lock;
 
166
  data->type= TL_UNLOCK;
 
167
  data->owner= NULL;                               /* no owner yet */
 
168
  data->status_param= param;
 
169
  data->cond= NULL;
135
170
}
136
171
 
137
172
 
149
184
static void wake_up_waiters(THR_LOCK *lock);
150
185
 
151
186
 
152
 
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
 
187
static enum enum_thr_lock_result
 
188
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
 
189
              bool in_wait_list)
153
190
{
154
 
  internal::st_my_thread_var *thread_var= session.getThreadVar();
155
 
 
156
 
  boost::condition_variable_any *cond= &thread_var->suspend;
 
191
  internal::st_my_thread_var *thread_var= my_thread_var;
 
192
  pthread_cond_t *cond= &thread_var->suspend;
 
193
  struct timespec wait_timeout;
157
194
  enum enum_thr_lock_result result= THR_LOCK_ABORTED;
158
195
  bool can_deadlock= test(data->owner->info->n_cursors);
159
196
 
 
197
  if (!in_wait_list)
160
198
  {
161
199
    (*wait->last)=data;                         /* Wait for lock */
162
200
    data->prev= wait->last;
163
201
    wait->last= &data->next;
164
202
  }
165
203
 
166
 
  current_global_counters.locks_waited++;
 
204
  statistic_increment(locks_waited, &internal::THR_LOCK_lock);
167
205
 
168
206
  /* Set up control struct to allow others to abort locks */
169
 
  thread_var->current_mutex= data->lock->native_handle();
170
 
  thread_var->current_cond=  &thread_var->suspend;
171
 
  data->cond= &thread_var->suspend;;
 
207
  thread_var->current_mutex= &data->lock->mutex;
 
208
  thread_var->current_cond=  cond;
 
209
  data->cond= cond;
172
210
 
173
 
  while (not thread_var->abort)
 
211
  if (can_deadlock)
 
212
    set_timespec(wait_timeout, table_lock_wait_timeout);
 
213
  while (!thread_var->abort || in_wait_list)
174
214
  {
175
 
    boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
176
 
 
177
 
    if (can_deadlock)
178
 
    {
179
 
      boost::xtime xt; 
180
 
      xtime_get(&xt, boost::TIME_UTC); 
181
 
      xt.sec += table_lock_wait_timeout; 
182
 
      if (not cond->timed_wait(scoped, xt))
183
 
      {
184
 
        result= THR_LOCK_WAIT_TIMEOUT;
185
 
        scoped.release();
186
 
        break;
187
 
      }
188
 
    }
189
 
    else
190
 
    {
191
 
      cond->wait(scoped);
192
 
    }
 
215
    int rc= (can_deadlock ?
 
216
             pthread_cond_timedwait(cond, &data->lock->mutex,
 
217
                                    &wait_timeout) :
 
218
             pthread_cond_wait(cond, &data->lock->mutex));
193
219
    /*
194
220
      We must break the wait if one of the following occurs:
195
221
      - the connection has been aborted (!thread_var->abort), but
203
229
      Order of checks below is important to not report about timeout
204
230
      if the predicate is true.
205
231
    */
206
 
    if (data->cond == NULL)
207
 
    {
208
 
      scoped.release();
209
 
      break;
210
 
    }
211
 
    scoped.release();
 
232
    if (data->cond == 0)
 
233
    {
 
234
      break;
 
235
    }
 
236
    if (rc == ETIMEDOUT || rc == ETIME)
 
237
    {
 
238
      result= THR_LOCK_WAIT_TIMEOUT;
 
239
      break;
 
240
    }
212
241
  }
213
242
  if (data->cond || data->type == TL_UNLOCK)
214
243
  {
225
254
  else
226
255
  {
227
256
    result= THR_LOCK_SUCCESS;
 
257
    if (data->lock->get_status)
 
258
      (*data->lock->get_status)(data->status_param, 0);
228
259
  }
229
 
  data->lock->unlock();
 
260
  pthread_mutex_unlock(&data->lock->mutex);
230
261
 
231
262
  /* The following must be done after unlock of lock->mutex */
232
 
  boost_unique_lock_t scopedLock(thread_var->mutex);
 
263
  pthread_mutex_lock(&thread_var->mutex);
233
264
  thread_var->current_mutex= NULL;
234
265
  thread_var->current_cond= NULL;
 
266
  pthread_mutex_unlock(&thread_var->mutex);
235
267
  return(result);
236
268
}
237
269
 
238
270
 
239
 
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
 
271
static enum enum_thr_lock_result
 
272
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
 
273
         enum thr_lock_type lock_type)
240
274
{
241
 
  THR_LOCK *lock= data->lock;
 
275
  THR_LOCK *lock=data->lock;
242
276
  enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
243
277
  struct st_lock_list *wait_queue;
244
278
  THR_LOCK_DATA *lock_owner;
247
281
  data->cond=0;                                 /* safety */
248
282
  data->type=lock_type;
249
283
  data->owner= owner;                           /* Must be reset ! */
250
 
  lock->lock();
 
284
  pthread_mutex_lock(&lock->mutex);
251
285
  if ((int) lock_type <= (int) TL_READ_NO_INSERT)
252
286
  {
253
287
    /* Request for READ lock */
273
307
        lock->read.last= &data->next;
274
308
        if (lock_type == TL_READ_NO_INSERT)
275
309
          lock->read_no_write_count++;
276
 
        current_global_counters.locks_immediate++;
 
310
        if (lock->get_status)
 
311
          (*lock->get_status)(data->status_param, 0);
 
312
        statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
277
313
        goto end;
278
314
      }
279
315
      if (lock->write.data->type == TL_WRITE_ONLY)
291
327
      (*lock->read.last)=data;                  /* Add to running FIFO */
292
328
      data->prev=lock->read.last;
293
329
      lock->read.last= &data->next;
 
330
      if (lock->get_status)
 
331
        (*lock->get_status)(data->status_param, 0);
294
332
      if (lock_type == TL_READ_NO_INSERT)
295
333
        lock->read_no_write_count++;
296
 
      current_global_counters.locks_immediate++;
 
334
      statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
297
335
      goto end;
298
336
    }
299
337
    /*
305
343
  }
306
344
  else                                          /* Request for WRITE lock */
307
345
  {
308
 
    if (lock_type == TL_WRITE_CONCURRENT_INSERT)
 
346
    if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
309
347
      data->type=lock_type= thr_upgraded_concurrent_insert_lock;
310
348
 
311
349
    if (lock->write.data)                       /* If there is a write lock */
340
378
        (*lock->write.last)=data;       /* Add to running fifo */
341
379
        data->prev=lock->write.last;
342
380
        lock->write.last= &data->next;
343
 
        current_global_counters.locks_immediate++;
 
381
        if (data->lock->get_status)
 
382
          (*data->lock->get_status)(data->status_param, 0);
 
383
        statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
344
384
        goto end;
345
385
      }
346
386
    }
352
392
        if (lock_type == TL_WRITE_CONCURRENT_INSERT)
353
393
        {
354
394
          concurrent_insert= 1;
 
395
          if ((*lock->check_status)(data->status_param))
 
396
          {
 
397
            concurrent_insert= 0;
 
398
            data->type=lock_type= thr_upgraded_concurrent_insert_lock;
 
399
          }
355
400
        }
356
401
 
357
402
        if (!lock->read.data ||
363
408
          (*lock->write.last)=data;             /* Add as current write lock */
364
409
          data->prev=lock->write.last;
365
410
          lock->write.last= &data->next;
366
 
          current_global_counters.locks_immediate++;
 
411
          if (data->lock->get_status)
 
412
            (*data->lock->get_status)(data->status_param, concurrent_insert);
 
413
          statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
367
414
          goto end;
368
415
        }
369
416
      }
382
429
    result= THR_LOCK_DEADLOCK;
383
430
    goto end;
384
431
  }
385
 
 
386
432
  /* Can't get lock yet;  Wait for it */
387
 
  return(wait_for_lock(session, wait_queue, data));
 
433
  return(wait_for_lock(wait_queue, data, 0));
388
434
end:
389
 
  lock->unlock();
390
 
 
 
435
  pthread_mutex_unlock(&lock->mutex);
391
436
  return(result);
392
437
}
393
438
 
394
439
 
395
440
static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
396
441
{
397
 
  THR_LOCK_DATA *data= lock->read_wait.data;
 
442
  THR_LOCK_DATA *data=lock->read_wait.data;
398
443
 
399
444
  /* move all locks from read_wait list to read list */
400
445
  (*lock->read.last)=data;
406
451
 
407
452
  do
408
453
  {
409
 
    boost::condition_variable_any *cond= data->cond;
 
454
    pthread_cond_t *cond=data->cond;
410
455
    if ((int) data->type == (int) TL_READ_NO_INSERT)
411
456
    {
412
457
      if (using_concurrent_insert)
426
471
      }
427
472
      lock->read_no_write_count++;
428
473
    }
429
 
    data->cond= NULL;                           /* Mark thread free */
430
 
    cond->notify_one();
 
474
    data->cond=0;                               /* Mark thread free */
 
475
    pthread_cond_signal(cond);
431
476
  } while ((data=data->next));
432
477
  *lock->read_wait.last=0;
433
478
  if (!lock->read_wait.data)
434
479
    lock->write_lock_count=0;
435
480
}
436
481
 
437
 
/* Unlock lock and free next thread on same lock */
 
482
        /* Unlock lock and free next thread on same lock */
438
483
 
439
484
static void thr_unlock(THR_LOCK_DATA *data)
440
485
{
441
486
  THR_LOCK *lock=data->lock;
442
487
  enum thr_lock_type lock_type=data->type;
443
 
  lock->lock();
 
488
  pthread_mutex_lock(&lock->mutex);
444
489
 
445
490
  if (((*data->prev)=data->next))               /* remove from lock-list */
446
491
    data->next->prev= data->prev;
449
494
  else
450
495
    lock->write.last=data->prev;
451
496
  if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
452
 
  { }
 
497
  {
 
498
    if (lock->update_status)
 
499
      (*lock->update_status)(data->status_param);
 
500
  }
453
501
  else
454
 
  { }
 
502
  {
 
503
    if (lock->restore_status)
 
504
      (*lock->restore_status)(data->status_param);
 
505
  }
455
506
  if (lock_type == TL_READ_NO_INSERT)
456
507
    lock->read_no_write_count--;
457
508
  data->type=TL_UNLOCK;                         /* Mark unlocked */
458
509
  wake_up_waiters(lock);
459
 
  lock->unlock();
 
510
  pthread_mutex_unlock(&lock->mutex);
 
511
  return;
460
512
}
461
513
 
462
514
 
502
554
          data->prev=lock->write.last;
503
555
          data->next=0;
504
556
          lock->write.last= &data->next;
505
 
 
 
557
          if (data->type == TL_WRITE_CONCURRENT_INSERT &&
 
558
              (*lock->check_status)(data->status_param))
 
559
            data->type=TL_WRITE;                        /* Upgrade lock */
506
560
          {
507
 
            boost::condition_variable_any *cond= data->cond;
508
 
            data->cond= NULL;                           /* Mark thread free */
509
 
            cond->notify_one(); /* Start waiting thred */
 
561
            pthread_cond_t *cond=data->cond;
 
562
            data->cond=0;                               /* Mark thread free */
 
563
            pthread_cond_signal(cond);  /* Start waiting thread */
510
564
          }
511
565
          if (data->type != TL_WRITE_ALLOW_WRITE ||
512
566
              !lock->write_wait.data ||
530
584
               lock_type != TL_WRITE_ALLOW_WRITE) ||
531
585
              !lock->read_no_write_count))
532
586
    {
 
587
      /*
 
588
        For DELAYED, ALLOW_READ, WRITE_ALLOW_WRITE or CONCURRENT_INSERT locks
 
589
        start WRITE locks together with the READ locks
 
590
      */
 
591
      if (lock_type == TL_WRITE_CONCURRENT_INSERT &&
 
592
          (*lock->check_status)(data->status_param))
 
593
      {
 
594
        data->type=TL_WRITE;                    /* Upgrade lock */
 
595
        if (lock->read_wait.data)
 
596
          free_all_read_locks(lock,0);
 
597
        goto end;
 
598
      }
533
599
      do {
534
 
        boost::condition_variable_any *cond= data->cond;
 
600
        pthread_cond_t *cond=data->cond;
535
601
        if (((*data->prev)=data->next))         /* remove from wait-list */
536
602
          data->next->prev= data->prev;
537
603
        else
540
606
        data->prev=lock->write.last;
541
607
        lock->write.last= &data->next;
542
608
        data->next=0;                           /* Only one write lock */
543
 
        data->cond= NULL;                               /* Mark thread free */
544
 
        cond->notify_one(); /* Start waiting thread */
 
609
        data->cond=0;                           /* Mark thread free */
 
610
        pthread_cond_signal(cond);      /* Start waiting thread */
545
611
      } while (lock_type == TL_WRITE_ALLOW_WRITE &&
546
612
               (data=lock->write_wait.data) &&
547
613
               data->type == TL_WRITE_ALLOW_WRITE);
551
617
                             lock_type == TL_WRITE_ALLOW_WRITE));
552
618
    }
553
619
    else if (!data && lock->read_wait.data)
554
 
    {
555
620
      free_all_read_locks(lock,0);
556
 
    }
557
621
  }
558
622
end:
559
623
  return;
591
655
 
592
656
 
593
657
enum enum_thr_lock_result
594
 
thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
 
658
thr_multi_lock(THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
595
659
{
596
660
  THR_LOCK_DATA **pos,**end;
597
661
  if (count > 1)
599
663
  /* lock everything */
600
664
  for (pos=data,end=data+count; pos < end ; pos++)
601
665
  {
602
 
    enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
 
666
    enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
603
667
    if (result != THR_LOCK_SUCCESS)
604
668
    {                                           /* Aborted */
605
669
      thr_multi_unlock(data,(uint32_t) (pos-data));
619
683
    do
620
684
    {
621
685
      pos--;
622
 
      last_lock=(*pos);
 
686
      if (last_lock->lock == (*pos)->lock &&
 
687
          last_lock->lock->copy_status)
 
688
      {
 
689
        if (last_lock->type <= TL_READ_NO_INSERT)
 
690
        {
 
691
          THR_LOCK_DATA **read_lock;
 
692
          /*
 
693
            If we are locking the same table with read locks we must ensure
 
694
            that all tables share the status of the last write lock or
 
695
            the same read lock.
 
696
          */
 
697
          for (;
 
698
               (*pos)->type <= TL_READ_NO_INSERT &&
 
699
                 pos != data &&
 
700
                 pos[-1]->lock == (*pos)->lock ;
 
701
               pos--) ;
 
702
 
 
703
          read_lock = pos+1;
 
704
          do
 
705
          {
 
706
            (last_lock->lock->copy_status)((*read_lock)->status_param,
 
707
                                           (*pos)->status_param);
 
708
          } while (*(read_lock++) != last_lock);
 
709
          last_lock= (*pos);                    /* Point at last write lock */
 
710
        }
 
711
        else
 
712
          (*last_lock->lock->copy_status)((*pos)->status_param,
 
713
                                          last_lock->status_param);
 
714
      }
 
715
      else
 
716
        last_lock=(*pos);
623
717
    } while (pos != data);
624
718
  }
625
719
#endif
640
734
  return;
641
735
}
642
736
 
643
 
void DrizzleLock::unlock(uint32_t count)
644
 
{
645
 
  THR_LOCK_DATA **pos,**end;
646
 
 
647
 
  for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
648
 
  {
649
 
    if ((*pos)->type != TL_UNLOCK)
650
 
      thr_unlock(*pos);
651
 
  }
652
 
}
653
 
 
654
737
/*
655
738
  Abort all threads waiting for a lock. The lock will be upgraded to
656
739
  TL_WRITE_ONLY to abort any new accesses to the lock
657
740
*/
658
741
 
659
 
void THR_LOCK::abort_locks()
 
742
void thr_abort_locks(THR_LOCK *lock)
660
743
{
661
 
  boost_unique_lock_t scopedLock(mutex);
 
744
  THR_LOCK_DATA *data;
 
745
  pthread_mutex_lock(&lock->mutex);
662
746
 
663
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
747
  for (data=lock->read_wait.data; data ; data=data->next)
664
748
  {
665
 
    local_data->type= TL_UNLOCK;                        /* Mark killed */
 
749
    data->type= TL_UNLOCK;                      /* Mark killed */
666
750
    /* It's safe to signal the cond first: we're still holding the mutex. */
667
 
    local_data->cond->notify_one();
668
 
    local_data->cond= NULL;                             /* Removed from list */
 
751
    pthread_cond_signal(data->cond);
 
752
    data->cond= NULL;                           /* Removed from list */
669
753
  }
670
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
754
  for (data=lock->write_wait.data; data ; data=data->next)
671
755
  {
672
 
    local_data->type= TL_UNLOCK;
673
 
    local_data->cond->notify_one();
674
 
    local_data->cond= NULL;
 
756
    data->type=TL_UNLOCK;
 
757
    pthread_cond_signal(data->cond);
 
758
    data->cond= NULL;
675
759
  }
676
 
  read_wait.last= &read_wait.data;
677
 
  write_wait.last= &write_wait.data;
678
 
  read_wait.data= write_wait.data=0;
679
 
  if (write.data)
680
 
    write.data->type=TL_WRITE_ONLY;
 
760
  lock->read_wait.last= &lock->read_wait.data;
 
761
  lock->write_wait.last= &lock->write_wait.data;
 
762
  lock->read_wait.data=lock->write_wait.data=0;
 
763
  if (lock->write.data)
 
764
    lock->write.data->type=TL_WRITE_ONLY;
 
765
  pthread_mutex_unlock(&lock->mutex);
 
766
  return;
681
767
}
682
768
 
683
769
 
687
773
  This is used to abort all locks for a specific thread
688
774
*/
689
775
 
690
 
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
 
776
bool thr_abort_locks_for_thread(THR_LOCK *lock, uint64_t thread_id)
691
777
{
 
778
  THR_LOCK_DATA *data;
692
779
  bool found= false;
693
780
 
694
 
  boost_unique_lock_t scopedLock(mutex);
695
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
781
  pthread_mutex_lock(&lock->mutex);
 
782
  for (data= lock->read_wait.data; data ; data= data->next)
696
783
  {
697
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
784
    if (data->owner->info->thread_id == thread_id)
698
785
    {
699
 
      local_data->type= TL_UNLOCK;                      /* Mark killed */
 
786
      data->type= TL_UNLOCK;                    /* Mark killed */
700
787
      /* It's safe to signal the cond first: we're still holding the mutex. */
701
788
      found= true;
702
 
      local_data->cond->notify_one();
703
 
      local_data->cond= 0;                              /* Removed from list */
 
789
      pthread_cond_signal(data->cond);
 
790
      data->cond= 0;                            /* Removed from list */
704
791
 
705
 
      if (((*local_data->prev)= local_data->next))
706
 
        local_data->next->prev= local_data->prev;
 
792
      if (((*data->prev)= data->next))
 
793
        data->next->prev= data->prev;
707
794
      else
708
 
        read_wait.last= local_data->prev;
 
795
        lock->read_wait.last= data->prev;
709
796
    }
710
797
  }
711
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
798
  for (data= lock->write_wait.data; data ; data= data->next)
712
799
  {
713
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
800
    if (data->owner->info->thread_id == thread_id)
714
801
    {
715
 
      local_data->type= TL_UNLOCK;
 
802
      data->type= TL_UNLOCK;
716
803
      found= true;
717
 
      local_data->cond->notify_one();
718
 
      local_data->cond= NULL;
 
804
      pthread_cond_signal(data->cond);
 
805
      data->cond= NULL;
719
806
 
720
 
      if (((*local_data->prev)= local_data->next))
721
 
        local_data->next->prev= local_data->prev;
 
807
      if (((*data->prev)= data->next))
 
808
        data->next->prev= data->prev;
722
809
      else
723
 
        write_wait.last= local_data->prev;
 
810
        lock->write_wait.last= data->prev;
724
811
    }
725
812
  }
726
 
  wake_up_waiters(this);
727
 
 
728
 
  return found;
 
813
  wake_up_waiters(lock);
 
814
  pthread_mutex_unlock(&lock->mutex);
 
815
  return(found);
729
816
}
730
817
 
731
818
} /* namespace drizzled */