12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
Read and write locks for Posix threads. All tread must acquire
46
46
should put a pointer to the following functions in the lock structure:
47
47
(If the pointer is zero (default), the function is not called)
50
Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
51
we check if this function exists and returns 0.
52
If not, then the lock is upgraded to TL_WRITE_LOCK
53
In MyISAM this is a simple check if the insert can be done
54
at the end of the datafile.
56
Before a write lock is released, this function is called.
57
In MyISAM this functions updates the count and length of the datafile
59
When one gets a lock this functions is called.
60
In MyISAM this stores the number of rows and size of the datafile
50
63
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
51
64
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
55
68
#include "config.h"
56
69
#include "drizzled/internal/my_sys.h"
57
#include "drizzled/internal/thread_var.h"
58
#include "drizzled/statistics_variables.h"
59
#include "drizzled/pthread_globals.h"
61
#include "drizzled/session.h"
63
71
#include "thr_lock.h"
64
72
#include "drizzled/internal/m_string.h"
79
87
#include <drizzled/util/test.h>
81
#include <boost/interprocess/sync/lock_options.hpp>
83
89
using namespace std;
94
bool thr_lock_inited= false;
95
uint32_t locks_immediate = 0L, locks_waited = 0L;
88
96
uint64_t table_lock_wait_timeout;
89
97
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
92
uint64_t max_write_lock_count= UINT64_MAX;
94
void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count);
100
static list<THR_LOCK *> thr_lock_thread_list; /* List of threads in use */
102
uint64_t max_write_lock_count= ~(uint64_t) 0L;
104
static inline pthread_cond_t *get_cond(void)
106
return &my_thread_var->suspend;
97
110
** For the future (now the thread specific cond is alloced by my_pthread.c)
115
thr_lock_inited= true;
100
120
static inline bool
101
121
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
109
129
void thr_lock_init(THR_LOCK *lock)
131
memset(lock, 0, sizeof(*lock));
132
pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
112
133
lock->read.last= &lock->read.data;
113
134
lock->read_wait.last= &lock->read_wait.data;
114
135
lock->write_wait.last= &lock->write_wait.data;
115
136
lock->write.last= &lock->write.data;
119
void THR_LOCK_INFO::init()
138
pthread_mutex_lock(&internal::THR_LOCK_lock); /* Add to locks in use */
139
thr_lock_thread_list.push_front(lock);
140
pthread_mutex_unlock(&internal::THR_LOCK_lock);
144
void thr_lock_delete(THR_LOCK *lock)
146
pthread_mutex_destroy(&lock->mutex);
147
pthread_mutex_lock(&internal::THR_LOCK_lock);
148
thr_lock_thread_list.remove(lock);
149
pthread_mutex_unlock(&internal::THR_LOCK_lock);
153
void thr_lock_info_init(THR_LOCK_INFO *info)
121
155
internal::st_my_thread_var *tmp= my_thread_var;
156
info->thread= tmp->pthread_self;
157
info->thread_id= tmp->id;
126
161
/* Initialize a lock instance */
128
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
163
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
132
owner= NULL; /* no owner yet */
133
status_param= param_arg;
166
data->type= TL_UNLOCK;
167
data->owner= NULL; /* no owner yet */
168
data->status_param= param;
149
184
static void wake_up_waiters(THR_LOCK *lock);
152
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
187
static enum enum_thr_lock_result
188
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
154
internal::st_my_thread_var *thread_var= session.getThreadVar();
156
boost::condition_variable_any *cond= &thread_var->suspend;
191
internal::st_my_thread_var *thread_var= my_thread_var;
192
pthread_cond_t *cond= &thread_var->suspend;
193
struct timespec wait_timeout;
157
194
enum enum_thr_lock_result result= THR_LOCK_ABORTED;
158
195
bool can_deadlock= test(data->owner->info->n_cursors);
161
199
(*wait->last)=data; /* Wait for lock */
162
200
data->prev= wait->last;
163
201
wait->last= &data->next;
166
current_global_counters.locks_waited++;
204
statistic_increment(locks_waited, &internal::THR_LOCK_lock);
168
206
/* Set up control struct to allow others to abort locks */
169
thread_var->current_mutex= data->lock->native_handle();
170
thread_var->current_cond= &thread_var->suspend;
171
data->cond= &thread_var->suspend;;
207
thread_var->current_mutex= &data->lock->mutex;
208
thread_var->current_cond= cond;
173
while (not thread_var->abort)
212
set_timespec(wait_timeout, table_lock_wait_timeout);
213
while (!thread_var->abort || in_wait_list)
175
boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
180
xtime_get(&xt, boost::TIME_UTC);
181
xt.sec += table_lock_wait_timeout;
182
if (not cond->timed_wait(scoped, xt))
184
result= THR_LOCK_WAIT_TIMEOUT;
215
int rc= (can_deadlock ?
216
pthread_cond_timedwait(cond, &data->lock->mutex,
218
pthread_cond_wait(cond, &data->lock->mutex));
194
220
We must break the wait if one of the following occurs:
195
221
- the connection has been aborted (!thread_var->abort), but
203
229
Order of checks below is important to not report about timeout
204
230
if the predicate is true.
206
if (data->cond == NULL)
236
if (rc == ETIMEDOUT || rc == ETIME)
238
result= THR_LOCK_WAIT_TIMEOUT;
213
242
if (data->cond || data->type == TL_UNLOCK)
227
256
result= THR_LOCK_SUCCESS;
257
if (data->lock->get_status)
258
(*data->lock->get_status)(data->status_param, 0);
229
data->lock->unlock();
260
pthread_mutex_unlock(&data->lock->mutex);
231
262
/* The following must be done after unlock of lock->mutex */
232
boost_unique_lock_t scopedLock(thread_var->mutex);
263
pthread_mutex_lock(&thread_var->mutex);
233
264
thread_var->current_mutex= NULL;
234
265
thread_var->current_cond= NULL;
266
pthread_mutex_unlock(&thread_var->mutex);
239
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
271
static enum enum_thr_lock_result
272
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
273
enum thr_lock_type lock_type)
241
THR_LOCK *lock= data->lock;
275
THR_LOCK *lock=data->lock;
242
276
enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
243
277
struct st_lock_list *wait_queue;
244
278
THR_LOCK_DATA *lock_owner;
247
281
data->cond=0; /* safety */
248
282
data->type=lock_type;
249
283
data->owner= owner; /* Must be reset ! */
284
pthread_mutex_lock(&lock->mutex);
251
285
if ((int) lock_type <= (int) TL_READ_NO_INSERT)
253
287
/* Request for READ lock */
273
307
lock->read.last= &data->next;
274
308
if (lock_type == TL_READ_NO_INSERT)
275
309
lock->read_no_write_count++;
276
current_global_counters.locks_immediate++;
310
if (lock->get_status)
311
(*lock->get_status)(data->status_param, 0);
312
statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
279
315
if (lock->write.data->type == TL_WRITE_ONLY)
291
327
(*lock->read.last)=data; /* Add to running FIFO */
292
328
data->prev=lock->read.last;
293
329
lock->read.last= &data->next;
330
if (lock->get_status)
331
(*lock->get_status)(data->status_param, 0);
294
332
if (lock_type == TL_READ_NO_INSERT)
295
333
lock->read_no_write_count++;
296
current_global_counters.locks_immediate++;
334
statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
306
344
else /* Request for WRITE lock */
308
if (lock_type == TL_WRITE_CONCURRENT_INSERT)
346
if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
309
347
data->type=lock_type= thr_upgraded_concurrent_insert_lock;
311
349
if (lock->write.data) /* If there is a write lock */
340
378
(*lock->write.last)=data; /* Add to running fifo */
341
379
data->prev=lock->write.last;
342
380
lock->write.last= &data->next;
343
current_global_counters.locks_immediate++;
381
if (data->lock->get_status)
382
(*data->lock->get_status)(data->status_param, 0);
383
statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
363
408
(*lock->write.last)=data; /* Add as current write lock */
364
409
data->prev=lock->write.last;
365
410
lock->write.last= &data->next;
366
current_global_counters.locks_immediate++;
411
if (data->lock->get_status)
412
(*data->lock->get_status)(data->status_param, concurrent_insert);
413
statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
382
429
result= THR_LOCK_DEADLOCK;
386
432
/* Can't get lock yet; Wait for it */
387
return(wait_for_lock(session, wait_queue, data));
433
return(wait_for_lock(wait_queue, data, 0));
435
pthread_mutex_unlock(&lock->mutex);
395
440
static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
397
THR_LOCK_DATA *data= lock->read_wait.data;
442
THR_LOCK_DATA *data=lock->read_wait.data;
399
444
/* move all locks from read_wait list to read list */
400
445
(*lock->read.last)=data;
427
472
lock->read_no_write_count++;
429
data->cond= NULL; /* Mark thread free */
474
data->cond=0; /* Mark thread free */
475
pthread_cond_signal(cond);
431
476
} while ((data=data->next));
432
477
*lock->read_wait.last=0;
433
478
if (!lock->read_wait.data)
434
479
lock->write_lock_count=0;
437
/* Unlock lock and free next thread on same lock */
482
/* Unlock lock and free next thread on same lock */
439
484
static void thr_unlock(THR_LOCK_DATA *data)
441
486
THR_LOCK *lock=data->lock;
442
487
enum thr_lock_type lock_type=data->type;
488
pthread_mutex_lock(&lock->mutex);
445
490
if (((*data->prev)=data->next)) /* remove from lock-list */
446
491
data->next->prev= data->prev;
450
495
lock->write.last=data->prev;
451
496
if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
498
if (lock->update_status)
499
(*lock->update_status)(data->status_param);
503
if (lock->restore_status)
504
(*lock->restore_status)(data->status_param);
455
506
if (lock_type == TL_READ_NO_INSERT)
456
507
lock->read_no_write_count--;
457
508
data->type=TL_UNLOCK; /* Mark unlocked */
458
509
wake_up_waiters(lock);
510
pthread_mutex_unlock(&lock->mutex);
502
554
data->prev=lock->write.last;
504
556
lock->write.last= &data->next;
557
if (data->type == TL_WRITE_CONCURRENT_INSERT &&
558
(*lock->check_status)(data->status_param))
559
data->type=TL_WRITE; /* Upgrade lock */
507
boost::condition_variable_any *cond= data->cond;
508
data->cond= NULL; /* Mark thread free */
509
cond->notify_one(); /* Start waiting thred */
561
pthread_cond_t *cond=data->cond;
562
data->cond=0; /* Mark thread free */
563
pthread_cond_signal(cond); /* Start waiting thread */
511
565
if (data->type != TL_WRITE_ALLOW_WRITE ||
512
566
!lock->write_wait.data ||
530
584
lock_type != TL_WRITE_ALLOW_WRITE) ||
531
585
!lock->read_no_write_count))
588
For DELAYED, ALLOW_READ, WRITE_ALLOW_WRITE or CONCURRENT_INSERT locks
589
start WRITE locks together with the READ locks
591
if (lock_type == TL_WRITE_CONCURRENT_INSERT &&
592
(*lock->check_status)(data->status_param))
594
data->type=TL_WRITE; /* Upgrade lock */
595
if (lock->read_wait.data)
596
free_all_read_locks(lock,0);
534
boost::condition_variable_any *cond= data->cond;
600
pthread_cond_t *cond=data->cond;
535
601
if (((*data->prev)=data->next)) /* remove from wait-list */
536
602
data->next->prev= data->prev;
540
606
data->prev=lock->write.last;
541
607
lock->write.last= &data->next;
542
608
data->next=0; /* Only one write lock */
543
data->cond= NULL; /* Mark thread free */
544
cond->notify_one(); /* Start waiting thread */
609
data->cond=0; /* Mark thread free */
610
pthread_cond_signal(cond); /* Start waiting thread */
545
611
} while (lock_type == TL_WRITE_ALLOW_WRITE &&
546
612
(data=lock->write_wait.data) &&
547
613
data->type == TL_WRITE_ALLOW_WRITE);
593
657
enum enum_thr_lock_result
594
thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
658
thr_multi_lock(THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
596
660
THR_LOCK_DATA **pos,**end;
599
663
/* lock everything */
600
664
for (pos=data,end=data+count; pos < end ; pos++)
602
enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
666
enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
603
667
if (result != THR_LOCK_SUCCESS)
605
669
thr_multi_unlock(data,(uint32_t) (pos-data));
686
if (last_lock->lock == (*pos)->lock &&
687
last_lock->lock->copy_status)
689
if (last_lock->type <= TL_READ_NO_INSERT)
691
THR_LOCK_DATA **read_lock;
693
If we are locking the same table with read locks we must ensure
694
that all tables share the status of the last write lock or
698
(*pos)->type <= TL_READ_NO_INSERT &&
700
pos[-1]->lock == (*pos)->lock ;
706
(last_lock->lock->copy_status)((*read_lock)->status_param,
707
(*pos)->status_param);
708
} while (*(read_lock++) != last_lock);
709
last_lock= (*pos); /* Point at last write lock */
712
(*last_lock->lock->copy_status)((*pos)->status_param,
713
last_lock->status_param);
623
717
} while (pos != data);
643
void DrizzleLock::unlock(uint32_t count)
645
THR_LOCK_DATA **pos,**end;
647
for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
649
if ((*pos)->type != TL_UNLOCK)
655
738
Abort all threads waiting for a lock. The lock will be upgraded to
656
739
TL_WRITE_ONLY to abort any new accesses to the lock
659
void THR_LOCK::abort_locks()
742
void thr_abort_locks(THR_LOCK *lock)
661
boost_unique_lock_t scopedLock(mutex);
745
pthread_mutex_lock(&lock->mutex);
663
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
747
for (data=lock->read_wait.data; data ; data=data->next)
665
local_data->type= TL_UNLOCK; /* Mark killed */
749
data->type= TL_UNLOCK; /* Mark killed */
666
750
/* It's safe to signal the cond first: we're still holding the mutex. */
667
local_data->cond->notify_one();
668
local_data->cond= NULL; /* Removed from list */
751
pthread_cond_signal(data->cond);
752
data->cond= NULL; /* Removed from list */
670
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
754
for (data=lock->write_wait.data; data ; data=data->next)
672
local_data->type= TL_UNLOCK;
673
local_data->cond->notify_one();
674
local_data->cond= NULL;
756
data->type=TL_UNLOCK;
757
pthread_cond_signal(data->cond);
676
read_wait.last= &read_wait.data;
677
write_wait.last= &write_wait.data;
678
read_wait.data= write_wait.data=0;
680
write.data->type=TL_WRITE_ONLY;
760
lock->read_wait.last= &lock->read_wait.data;
761
lock->write_wait.last= &lock->write_wait.data;
762
lock->read_wait.data=lock->write_wait.data=0;
763
if (lock->write.data)
764
lock->write.data->type=TL_WRITE_ONLY;
765
pthread_mutex_unlock(&lock->mutex);
687
773
This is used to abort all locks for a specific thread
690
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
776
bool thr_abort_locks_for_thread(THR_LOCK *lock, uint64_t thread_id)
692
779
bool found= false;
694
boost_unique_lock_t scopedLock(mutex);
695
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
781
pthread_mutex_lock(&lock->mutex);
782
for (data= lock->read_wait.data; data ; data= data->next)
697
if (local_data->owner->info->thread_id == thread_id_arg)
784
if (data->owner->info->thread_id == thread_id)
699
local_data->type= TL_UNLOCK; /* Mark killed */
786
data->type= TL_UNLOCK; /* Mark killed */
700
787
/* It's safe to signal the cond first: we're still holding the mutex. */
702
local_data->cond->notify_one();
703
local_data->cond= 0; /* Removed from list */
789
pthread_cond_signal(data->cond);
790
data->cond= 0; /* Removed from list */
705
if (((*local_data->prev)= local_data->next))
706
local_data->next->prev= local_data->prev;
792
if (((*data->prev)= data->next))
793
data->next->prev= data->prev;
708
read_wait.last= local_data->prev;
795
lock->read_wait.last= data->prev;
711
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
798
for (data= lock->write_wait.data; data ; data= data->next)
713
if (local_data->owner->info->thread_id == thread_id_arg)
800
if (data->owner->info->thread_id == thread_id)
715
local_data->type= TL_UNLOCK;
802
data->type= TL_UNLOCK;
717
local_data->cond->notify_one();
718
local_data->cond= NULL;
804
pthread_cond_signal(data->cond);
720
if (((*local_data->prev)= local_data->next))
721
local_data->next->prev= local_data->prev;
807
if (((*data->prev)= data->next))
808
data->next->prev= data->prev;
723
write_wait.last= local_data->prev;
810
lock->write_wait.last= data->prev;
726
wake_up_waiters(this);
813
wake_up_waiters(lock);
814
pthread_mutex_unlock(&lock->mutex);
731
818
} /* namespace drizzled */