12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
17
17
Read and write locks for Posix threads. All tread must acquire
46
46
should put a pointer to the following functions in the lock structure:
47
47
(If the pointer is zero (default), the function is not called)
50
Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
51
we check if this function exists and returns 0.
52
If not, then the lock is upgraded to TL_WRITE_LOCK
53
In MyISAM this is a simple check if the insert can be done
54
at the end of the datafile.
56
Before a write lock is released, this function is called.
57
In MyISAM this functions updates the count and length of the datafile
59
When one gets a lock this functions is called.
60
In MyISAM this stores the number of rows and size of the datafile
63
50
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
64
51
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
68
55
#include "config.h"
69
56
#include "drizzled/internal/my_sys.h"
57
#include "drizzled/internal/thread_var.h"
58
#include "drizzled/statistics_variables.h"
59
#include "drizzled/pthread_globals.h"
61
#include "drizzled/session.h"
71
63
#include "thr_lock.h"
72
64
#include "drizzled/internal/m_string.h"
87
79
#include <drizzled/util/test.h>
81
#include <boost/interprocess/sync/lock_options.hpp>
89
83
using namespace std;
94
bool thr_lock_inited= false;
95
uint32_t locks_immediate = 0L, locks_waited = 0L;
96
88
uint64_t table_lock_wait_timeout;
97
89
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
100
static list<THR_LOCK *> thr_lock_thread_list; /* List of threads in use */
102
uint64_t max_write_lock_count= ~(uint64_t) 0L;
104
static inline pthread_cond_t *get_cond(void)
106
return &my_thread_var->suspend;
92
uint64_t max_write_lock_count= UINT64_MAX;
94
void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count);
110
97
** For the future (now the thread specific cond is alloced by my_pthread.c)
115
thr_lock_inited= true;
120
100
static inline bool
121
101
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
129
109
void thr_lock_init(THR_LOCK *lock)
131
memset(lock, 0, sizeof(*lock));
132
pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
133
112
lock->read.last= &lock->read.data;
134
113
lock->read_wait.last= &lock->read_wait.data;
135
114
lock->write_wait.last= &lock->write_wait.data;
136
115
lock->write.last= &lock->write.data;
138
pthread_mutex_lock(&internal::THR_LOCK_lock); /* Add to locks in use */
139
thr_lock_thread_list.push_front(lock);
140
pthread_mutex_unlock(&internal::THR_LOCK_lock);
144
void thr_lock_delete(THR_LOCK *lock)
146
pthread_mutex_destroy(&lock->mutex);
147
pthread_mutex_lock(&internal::THR_LOCK_lock);
148
thr_lock_thread_list.remove(lock);
149
pthread_mutex_unlock(&internal::THR_LOCK_lock);
153
void thr_lock_info_init(THR_LOCK_INFO *info)
119
void THR_LOCK_INFO::init()
155
121
internal::st_my_thread_var *tmp= my_thread_var;
156
info->thread= tmp->pthread_self;
157
info->thread_id= tmp->id;
161
126
/* Initialize a lock instance */
163
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
128
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
166
data->type= TL_UNLOCK;
167
data->owner= NULL; /* no owner yet */
168
data->status_param= param;
132
owner= NULL; /* no owner yet */
133
status_param= param_arg;
184
149
static void wake_up_waiters(THR_LOCK *lock);
187
static enum enum_thr_lock_result
188
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
152
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
191
internal::st_my_thread_var *thread_var= my_thread_var;
192
pthread_cond_t *cond= &thread_var->suspend;
193
struct timespec wait_timeout;
154
internal::st_my_thread_var *thread_var= session.getThreadVar();
156
boost::condition_variable_any *cond= &thread_var->suspend;
194
157
enum enum_thr_lock_result result= THR_LOCK_ABORTED;
195
158
bool can_deadlock= test(data->owner->info->n_cursors);
199
161
(*wait->last)=data; /* Wait for lock */
200
162
data->prev= wait->last;
201
163
wait->last= &data->next;
204
statistic_increment(locks_waited, &internal::THR_LOCK_lock);
166
current_global_counters.locks_waited++;
206
168
/* Set up control struct to allow others to abort locks */
207
thread_var->current_mutex= &data->lock->mutex;
208
thread_var->current_cond= cond;
169
thread_var->current_mutex= data->lock->native_handle();
170
thread_var->current_cond= &thread_var->suspend;
171
data->cond= &thread_var->suspend;;
212
set_timespec(wait_timeout, table_lock_wait_timeout);
213
while (!thread_var->abort || in_wait_list)
173
while (not thread_var->abort)
215
int rc= (can_deadlock ?
216
pthread_cond_timedwait(cond, &data->lock->mutex,
218
pthread_cond_wait(cond, &data->lock->mutex));
175
boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
180
xtime_get(&xt, boost::TIME_UTC);
181
xt.sec += table_lock_wait_timeout;
182
if (not cond->timed_wait(scoped, xt))
184
result= THR_LOCK_WAIT_TIMEOUT;
220
194
We must break the wait if one of the following occurs:
221
195
- the connection has been aborted (!thread_var->abort), but
229
203
Order of checks below is important to not report about timeout
230
204
if the predicate is true.
236
if (rc == ETIMEDOUT || rc == ETIME)
238
result= THR_LOCK_WAIT_TIMEOUT;
206
if (data->cond == NULL)
242
213
if (data->cond || data->type == TL_UNLOCK)
256
227
result= THR_LOCK_SUCCESS;
257
if (data->lock->get_status)
258
(*data->lock->get_status)(data->status_param, 0);
260
pthread_mutex_unlock(&data->lock->mutex);
229
data->lock->unlock();
262
231
/* The following must be done after unlock of lock->mutex */
263
pthread_mutex_lock(&thread_var->mutex);
232
boost_unique_lock_t scopedLock(thread_var->mutex);
264
233
thread_var->current_mutex= NULL;
265
234
thread_var->current_cond= NULL;
266
pthread_mutex_unlock(&thread_var->mutex);
271
static enum enum_thr_lock_result
272
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
273
enum thr_lock_type lock_type)
239
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
275
THR_LOCK *lock=data->lock;
241
THR_LOCK *lock= data->lock;
276
242
enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
277
243
struct st_lock_list *wait_queue;
278
244
THR_LOCK_DATA *lock_owner;
281
247
data->cond=0; /* safety */
282
248
data->type=lock_type;
283
249
data->owner= owner; /* Must be reset ! */
284
pthread_mutex_lock(&lock->mutex);
285
251
if ((int) lock_type <= (int) TL_READ_NO_INSERT)
287
253
/* Request for READ lock */
307
273
lock->read.last= &data->next;
308
274
if (lock_type == TL_READ_NO_INSERT)
309
275
lock->read_no_write_count++;
310
if (lock->get_status)
311
(*lock->get_status)(data->status_param, 0);
312
statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
276
current_global_counters.locks_immediate++;
315
279
if (lock->write.data->type == TL_WRITE_ONLY)
327
291
(*lock->read.last)=data; /* Add to running FIFO */
328
292
data->prev=lock->read.last;
329
293
lock->read.last= &data->next;
330
if (lock->get_status)
331
(*lock->get_status)(data->status_param, 0);
332
294
if (lock_type == TL_READ_NO_INSERT)
333
295
lock->read_no_write_count++;
334
statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
296
current_global_counters.locks_immediate++;
344
306
else /* Request for WRITE lock */
346
if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
308
if (lock_type == TL_WRITE_CONCURRENT_INSERT)
347
309
data->type=lock_type= thr_upgraded_concurrent_insert_lock;
349
311
if (lock->write.data) /* If there is a write lock */
378
340
(*lock->write.last)=data; /* Add to running fifo */
379
341
data->prev=lock->write.last;
380
342
lock->write.last= &data->next;
381
if (data->lock->get_status)
382
(*data->lock->get_status)(data->status_param, 0);
383
statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
343
current_global_counters.locks_immediate++;
408
363
(*lock->write.last)=data; /* Add as current write lock */
409
364
data->prev=lock->write.last;
410
365
lock->write.last= &data->next;
411
if (data->lock->get_status)
412
(*data->lock->get_status)(data->status_param, concurrent_insert);
413
statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
366
current_global_counters.locks_immediate++;
429
382
result= THR_LOCK_DEADLOCK;
432
386
/* Can't get lock yet; Wait for it */
433
return(wait_for_lock(wait_queue, data, 0));
387
return(wait_for_lock(session, wait_queue, data));
435
pthread_mutex_unlock(&lock->mutex);
440
395
static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
442
THR_LOCK_DATA *data=lock->read_wait.data;
397
THR_LOCK_DATA *data= lock->read_wait.data;
444
399
/* move all locks from read_wait list to read list */
445
400
(*lock->read.last)=data;
472
427
lock->read_no_write_count++;
474
data->cond=0; /* Mark thread free */
475
pthread_cond_signal(cond);
429
data->cond= NULL; /* Mark thread free */
476
431
} while ((data=data->next));
477
432
*lock->read_wait.last=0;
478
433
if (!lock->read_wait.data)
479
434
lock->write_lock_count=0;
482
/* Unlock lock and free next thread on same lock */
437
/* Unlock lock and free next thread on same lock */
484
439
static void thr_unlock(THR_LOCK_DATA *data)
486
441
THR_LOCK *lock=data->lock;
487
442
enum thr_lock_type lock_type=data->type;
488
pthread_mutex_lock(&lock->mutex);
490
445
if (((*data->prev)=data->next)) /* remove from lock-list */
491
446
data->next->prev= data->prev;
495
450
lock->write.last=data->prev;
496
451
if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
498
if (lock->update_status)
499
(*lock->update_status)(data->status_param);
503
if (lock->restore_status)
504
(*lock->restore_status)(data->status_param);
506
455
if (lock_type == TL_READ_NO_INSERT)
507
456
lock->read_no_write_count--;
508
457
data->type=TL_UNLOCK; /* Mark unlocked */
509
458
wake_up_waiters(lock);
510
pthread_mutex_unlock(&lock->mutex);
554
502
data->prev=lock->write.last;
556
504
lock->write.last= &data->next;
557
if (data->type == TL_WRITE_CONCURRENT_INSERT &&
558
(*lock->check_status)(data->status_param))
559
data->type=TL_WRITE; /* Upgrade lock */
561
pthread_cond_t *cond=data->cond;
562
data->cond=0; /* Mark thread free */
563
pthread_cond_signal(cond); /* Start waiting thread */
507
boost::condition_variable_any *cond= data->cond;
508
data->cond= NULL; /* Mark thread free */
509
cond->notify_one(); /* Start waiting thred */
565
511
if (data->type != TL_WRITE_ALLOW_WRITE ||
566
512
!lock->write_wait.data ||
584
530
lock_type != TL_WRITE_ALLOW_WRITE) ||
585
531
!lock->read_no_write_count))
588
For DELAYED, ALLOW_READ, WRITE_ALLOW_WRITE or CONCURRENT_INSERT locks
589
start WRITE locks together with the READ locks
591
if (lock_type == TL_WRITE_CONCURRENT_INSERT &&
592
(*lock->check_status)(data->status_param))
594
data->type=TL_WRITE; /* Upgrade lock */
595
if (lock->read_wait.data)
596
free_all_read_locks(lock,0);
600
pthread_cond_t *cond=data->cond;
534
boost::condition_variable_any *cond= data->cond;
601
535
if (((*data->prev)=data->next)) /* remove from wait-list */
602
536
data->next->prev= data->prev;
606
540
data->prev=lock->write.last;
607
541
lock->write.last= &data->next;
608
542
data->next=0; /* Only one write lock */
609
data->cond=0; /* Mark thread free */
610
pthread_cond_signal(cond); /* Start waiting thread */
543
data->cond= NULL; /* Mark thread free */
544
cond->notify_one(); /* Start waiting thread */
611
545
} while (lock_type == TL_WRITE_ALLOW_WRITE &&
612
546
(data=lock->write_wait.data) &&
613
547
data->type == TL_WRITE_ALLOW_WRITE);
657
593
enum enum_thr_lock_result
658
thr_multi_lock(THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
594
thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
660
596
THR_LOCK_DATA **pos,**end;
663
599
/* lock everything */
664
600
for (pos=data,end=data+count; pos < end ; pos++)
666
enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
602
enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
667
603
if (result != THR_LOCK_SUCCESS)
669
605
thr_multi_unlock(data,(uint32_t) (pos-data));
686
if (last_lock->lock == (*pos)->lock &&
687
last_lock->lock->copy_status)
689
if (last_lock->type <= TL_READ_NO_INSERT)
691
THR_LOCK_DATA **read_lock;
693
If we are locking the same table with read locks we must ensure
694
that all tables share the status of the last write lock or
698
(*pos)->type <= TL_READ_NO_INSERT &&
700
pos[-1]->lock == (*pos)->lock ;
706
(last_lock->lock->copy_status)((*read_lock)->status_param,
707
(*pos)->status_param);
708
} while (*(read_lock++) != last_lock);
709
last_lock= (*pos); /* Point at last write lock */
712
(*last_lock->lock->copy_status)((*pos)->status_param,
713
last_lock->status_param);
717
623
} while (pos != data);
643
void DrizzleLock::unlock(uint32_t count)
645
THR_LOCK_DATA **pos,**end;
647
for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
649
if ((*pos)->type != TL_UNLOCK)
738
655
Abort all threads waiting for a lock. The lock will be upgraded to
739
656
TL_WRITE_ONLY to abort any new accesses to the lock
742
void thr_abort_locks(THR_LOCK *lock)
659
void THR_LOCK::abort_locks()
745
pthread_mutex_lock(&lock->mutex);
661
boost_unique_lock_t scopedLock(mutex);
747
for (data=lock->read_wait.data; data ; data=data->next)
663
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
749
data->type= TL_UNLOCK; /* Mark killed */
665
local_data->type= TL_UNLOCK; /* Mark killed */
750
666
/* It's safe to signal the cond first: we're still holding the mutex. */
751
pthread_cond_signal(data->cond);
752
data->cond= NULL; /* Removed from list */
667
local_data->cond->notify_one();
668
local_data->cond= NULL; /* Removed from list */
754
for (data=lock->write_wait.data; data ; data=data->next)
670
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
756
data->type=TL_UNLOCK;
757
pthread_cond_signal(data->cond);
672
local_data->type= TL_UNLOCK;
673
local_data->cond->notify_one();
674
local_data->cond= NULL;
760
lock->read_wait.last= &lock->read_wait.data;
761
lock->write_wait.last= &lock->write_wait.data;
762
lock->read_wait.data=lock->write_wait.data=0;
763
if (lock->write.data)
764
lock->write.data->type=TL_WRITE_ONLY;
765
pthread_mutex_unlock(&lock->mutex);
676
read_wait.last= &read_wait.data;
677
write_wait.last= &write_wait.data;
678
read_wait.data= write_wait.data=0;
680
write.data->type=TL_WRITE_ONLY;
773
687
This is used to abort all locks for a specific thread
776
bool thr_abort_locks_for_thread(THR_LOCK *lock, uint64_t thread_id)
690
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
779
692
bool found= false;
781
pthread_mutex_lock(&lock->mutex);
782
for (data= lock->read_wait.data; data ; data= data->next)
694
boost_unique_lock_t scopedLock(mutex);
695
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
784
if (data->owner->info->thread_id == thread_id)
697
if (local_data->owner->info->thread_id == thread_id_arg)
786
data->type= TL_UNLOCK; /* Mark killed */
699
local_data->type= TL_UNLOCK; /* Mark killed */
787
700
/* It's safe to signal the cond first: we're still holding the mutex. */
789
pthread_cond_signal(data->cond);
790
data->cond= 0; /* Removed from list */
702
local_data->cond->notify_one();
703
local_data->cond= 0; /* Removed from list */
792
if (((*data->prev)= data->next))
793
data->next->prev= data->prev;
705
if (((*local_data->prev)= local_data->next))
706
local_data->next->prev= local_data->prev;
795
lock->read_wait.last= data->prev;
708
read_wait.last= local_data->prev;
798
for (data= lock->write_wait.data; data ; data= data->next)
711
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
800
if (data->owner->info->thread_id == thread_id)
713
if (local_data->owner->info->thread_id == thread_id_arg)
802
data->type= TL_UNLOCK;
715
local_data->type= TL_UNLOCK;
804
pthread_cond_signal(data->cond);
717
local_data->cond->notify_one();
718
local_data->cond= NULL;
807
if (((*data->prev)= data->next))
808
data->next->prev= data->prev;
720
if (((*local_data->prev)= local_data->next))
721
local_data->next->prev= local_data->prev;
810
lock->write_wait.last= data->prev;
723
write_wait.last= local_data->prev;
813
wake_up_waiters(lock);
814
pthread_mutex_unlock(&lock->mutex);
726
wake_up_waiters(this);
818
731
} /* namespace drizzled */