12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
Read and write locks for Posix threads. All tread must acquire
46
46
should put a pointer to the following functions in the lock structure:
47
47
(If the pointer is zero (default), the function is not called)
50
Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
51
we check if this function exists and returns 0.
52
If not, then the lock is upgraded to TL_WRITE_LOCK
53
In MyISAM this is a simple check if the insert can be done
54
at the end of the datafile.
56
Before a write lock is released, this function is called.
57
In MyISAM this functions updates the count and length of the datafile
59
When one gets a lock this functions is called.
60
In MyISAM this stores the number of rows and size of the datafile
50
63
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
51
64
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
56
#include "drizzled/internal/my_sys.h"
57
#include "drizzled/internal/thread_var.h"
58
#include "drizzled/statistics_variables.h"
59
#include "drizzled/pthread_globals.h"
61
#include "drizzled/session.h"
68
#include "mysys/mysys_priv.h"
63
70
#include "thr_lock.h"
64
#include "drizzled/internal/m_string.h"
71
#include <mystrings/m_string.h>
79
86
#include <drizzled/util/test.h>
81
#include <boost/interprocess/sync/lock_options.hpp>
83
88
using namespace std;
90
bool thr_lock_inited= false;
91
uint32_t locks_immediate = 0L, locks_waited = 0L;
88
92
uint64_t table_lock_wait_timeout;
89
93
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
92
uint64_t max_write_lock_count= UINT64_MAX;
94
void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count);
96
static list<THR_LOCK *> thr_lock_thread_list; /* List of threads in use */
98
uint64_t max_write_lock_count= ~(uint64_t) 0L;
100
static inline pthread_cond_t *get_cond(void)
102
return &my_thread_var->suspend;
97
106
** For the future (now the thread specific cond is alloced by my_pthread.c)
111
thr_lock_inited= true;
100
116
static inline bool
101
117
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
109
125
void thr_lock_init(THR_LOCK *lock)
127
memset(lock, 0, sizeof(*lock));
128
pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
112
129
lock->read.last= &lock->read.data;
113
130
lock->read_wait.last= &lock->read_wait.data;
114
131
lock->write_wait.last= &lock->write_wait.data;
115
132
lock->write.last= &lock->write.data;
119
void THR_LOCK_INFO::init()
121
internal::st_my_thread_var *tmp= my_thread_var;
134
pthread_mutex_lock(&THR_LOCK_lock); /* Add to locks in use */
135
thr_lock_thread_list.push_front(lock);
136
pthread_mutex_unlock(&THR_LOCK_lock);
140
void thr_lock_delete(THR_LOCK *lock)
142
pthread_mutex_destroy(&lock->mutex);
143
pthread_mutex_lock(&THR_LOCK_lock);
144
thr_lock_thread_list.remove(lock);
145
pthread_mutex_unlock(&THR_LOCK_lock);
149
void thr_lock_info_init(THR_LOCK_INFO *info)
151
struct st_my_thread_var *tmp= my_thread_var;
152
info->thread= tmp->pthread_self;
153
info->thread_id= tmp->id;
126
157
/* Initialize a lock instance */
128
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
159
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
132
owner= NULL; /* no owner yet */
133
status_param= param_arg;
162
data->type= TL_UNLOCK;
163
data->owner= NULL; /* no owner yet */
164
data->status_param= param;
149
180
static void wake_up_waiters(THR_LOCK *lock);
152
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
183
static enum enum_thr_lock_result
184
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
154
internal::st_my_thread_var *thread_var= session.getThreadVar();
156
boost::condition_variable_any *cond= &thread_var->suspend;
187
struct st_my_thread_var *thread_var= my_thread_var;
188
pthread_cond_t *cond= &thread_var->suspend;
189
struct timespec wait_timeout;
157
190
enum enum_thr_lock_result result= THR_LOCK_ABORTED;
158
191
bool can_deadlock= test(data->owner->info->n_cursors);
161
195
(*wait->last)=data; /* Wait for lock */
162
196
data->prev= wait->last;
163
197
wait->last= &data->next;
166
current_global_counters.locks_waited++;
200
statistic_increment(locks_waited, &THR_LOCK_lock);
168
202
/* Set up control struct to allow others to abort locks */
169
thread_var->current_mutex= data->lock->native_handle();
170
thread_var->current_cond= &thread_var->suspend;
171
data->cond= &thread_var->suspend;;
203
thread_var->current_mutex= &data->lock->mutex;
204
thread_var->current_cond= cond;
173
while (not thread_var->abort)
208
set_timespec(wait_timeout, table_lock_wait_timeout);
209
while (!thread_var->abort || in_wait_list)
175
boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
180
xtime_get(&xt, boost::TIME_UTC);
181
xt.sec += table_lock_wait_timeout;
182
if (not cond->timed_wait(scoped, xt))
184
result= THR_LOCK_WAIT_TIMEOUT;
211
int rc= (can_deadlock ?
212
pthread_cond_timedwait(cond, &data->lock->mutex,
214
pthread_cond_wait(cond, &data->lock->mutex));
194
216
We must break the wait if one of the following occurs:
195
217
- the connection has been aborted (!thread_var->abort), but
203
225
Order of checks below is important to not report about timeout
204
226
if the predicate is true.
206
if (data->cond == NULL)
232
if (rc == ETIMEDOUT || rc == ETIME)
234
result= THR_LOCK_WAIT_TIMEOUT;
213
238
if (data->cond || data->type == TL_UNLOCK)
227
252
result= THR_LOCK_SUCCESS;
253
if (data->lock->get_status)
254
(*data->lock->get_status)(data->status_param, 0);
229
data->lock->unlock();
256
pthread_mutex_unlock(&data->lock->mutex);
231
258
/* The following must be done after unlock of lock->mutex */
232
boost_unique_lock_t scopedLock(thread_var->mutex);
259
pthread_mutex_lock(&thread_var->mutex);
233
260
thread_var->current_mutex= NULL;
234
261
thread_var->current_cond= NULL;
262
pthread_mutex_unlock(&thread_var->mutex);
239
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
267
static enum enum_thr_lock_result
268
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
269
enum thr_lock_type lock_type)
241
THR_LOCK *lock= data->lock;
271
THR_LOCK *lock=data->lock;
242
272
enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
243
273
struct st_lock_list *wait_queue;
244
274
THR_LOCK_DATA *lock_owner;
247
277
data->cond=0; /* safety */
248
278
data->type=lock_type;
249
279
data->owner= owner; /* Must be reset ! */
280
pthread_mutex_lock(&lock->mutex);
251
281
if ((int) lock_type <= (int) TL_READ_NO_INSERT)
253
283
/* Request for READ lock */
273
303
lock->read.last= &data->next;
274
304
if (lock_type == TL_READ_NO_INSERT)
275
305
lock->read_no_write_count++;
276
current_global_counters.locks_immediate++;
306
if (lock->get_status)
307
(*lock->get_status)(data->status_param, 0);
308
statistic_increment(locks_immediate,&THR_LOCK_lock);
279
311
if (lock->write.data->type == TL_WRITE_ONLY)
291
323
(*lock->read.last)=data; /* Add to running FIFO */
292
324
data->prev=lock->read.last;
293
325
lock->read.last= &data->next;
326
if (lock->get_status)
327
(*lock->get_status)(data->status_param, 0);
294
328
if (lock_type == TL_READ_NO_INSERT)
295
329
lock->read_no_write_count++;
296
current_global_counters.locks_immediate++;
330
statistic_increment(locks_immediate,&THR_LOCK_lock);
306
340
else /* Request for WRITE lock */
308
if (lock_type == TL_WRITE_CONCURRENT_INSERT)
342
if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
309
343
data->type=lock_type= thr_upgraded_concurrent_insert_lock;
311
345
if (lock->write.data) /* If there is a write lock */
340
374
(*lock->write.last)=data; /* Add to running fifo */
341
375
data->prev=lock->write.last;
342
376
lock->write.last= &data->next;
343
current_global_counters.locks_immediate++;
377
if (data->lock->get_status)
378
(*data->lock->get_status)(data->status_param, 0);
379
statistic_increment(locks_immediate,&THR_LOCK_lock);
363
404
(*lock->write.last)=data; /* Add as current write lock */
364
405
data->prev=lock->write.last;
365
406
lock->write.last= &data->next;
366
current_global_counters.locks_immediate++;
407
if (data->lock->get_status)
408
(*data->lock->get_status)(data->status_param, concurrent_insert);
409
statistic_increment(locks_immediate,&THR_LOCK_lock);
382
425
result= THR_LOCK_DEADLOCK;
386
428
/* Can't get lock yet; Wait for it */
387
return(wait_for_lock(session, wait_queue, data));
429
return(wait_for_lock(wait_queue, data, 0));
431
pthread_mutex_unlock(&lock->mutex);
395
436
static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
397
THR_LOCK_DATA *data= lock->read_wait.data;
438
THR_LOCK_DATA *data=lock->read_wait.data;
399
440
/* move all locks from read_wait list to read list */
400
441
(*lock->read.last)=data;
427
468
lock->read_no_write_count++;
429
data->cond= NULL; /* Mark thread free */
470
data->cond=0; /* Mark thread free */
471
pthread_cond_signal(cond);
431
472
} while ((data=data->next));
432
473
*lock->read_wait.last=0;
433
474
if (!lock->read_wait.data)
434
475
lock->write_lock_count=0;
437
/* Unlock lock and free next thread on same lock */
478
/* Unlock lock and free next thread on same lock */
439
480
static void thr_unlock(THR_LOCK_DATA *data)
441
482
THR_LOCK *lock=data->lock;
442
483
enum thr_lock_type lock_type=data->type;
484
pthread_mutex_lock(&lock->mutex);
445
486
if (((*data->prev)=data->next)) /* remove from lock-list */
446
487
data->next->prev= data->prev;
450
491
lock->write.last=data->prev;
451
492
if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
494
if (lock->update_status)
495
(*lock->update_status)(data->status_param);
499
if (lock->restore_status)
500
(*lock->restore_status)(data->status_param);
455
502
if (lock_type == TL_READ_NO_INSERT)
456
503
lock->read_no_write_count--;
457
504
data->type=TL_UNLOCK; /* Mark unlocked */
458
505
wake_up_waiters(lock);
506
pthread_mutex_unlock(&lock->mutex);
502
550
data->prev=lock->write.last;
504
552
lock->write.last= &data->next;
553
if (data->type == TL_WRITE_CONCURRENT_INSERT &&
554
(*lock->check_status)(data->status_param))
555
data->type=TL_WRITE; /* Upgrade lock */
507
boost::condition_variable_any *cond= data->cond;
508
data->cond= NULL; /* Mark thread free */
509
cond->notify_one(); /* Start waiting thred */
557
pthread_cond_t *cond=data->cond;
558
data->cond=0; /* Mark thread free */
559
pthread_cond_signal(cond); /* Start waiting thread */
511
561
if (data->type != TL_WRITE_ALLOW_WRITE ||
512
562
!lock->write_wait.data ||
530
580
lock_type != TL_WRITE_ALLOW_WRITE) ||
531
581
!lock->read_no_write_count))
584
For DELAYED, ALLOW_READ, WRITE_ALLOW_WRITE or CONCURRENT_INSERT locks
585
start WRITE locks together with the READ locks
587
if (lock_type == TL_WRITE_CONCURRENT_INSERT &&
588
(*lock->check_status)(data->status_param))
590
data->type=TL_WRITE; /* Upgrade lock */
591
if (lock->read_wait.data)
592
free_all_read_locks(lock,0);
534
boost::condition_variable_any *cond= data->cond;
596
pthread_cond_t *cond=data->cond;
535
597
if (((*data->prev)=data->next)) /* remove from wait-list */
536
598
data->next->prev= data->prev;
540
602
data->prev=lock->write.last;
541
603
lock->write.last= &data->next;
542
604
data->next=0; /* Only one write lock */
543
data->cond= NULL; /* Mark thread free */
544
cond->notify_one(); /* Start waiting thread */
605
data->cond=0; /* Mark thread free */
606
pthread_cond_signal(cond); /* Start waiting thread */
545
607
} while (lock_type == TL_WRITE_ALLOW_WRITE &&
546
608
(data=lock->write_wait.data) &&
547
609
data->type == TL_WRITE_ALLOW_WRITE);
593
653
enum enum_thr_lock_result
594
thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
654
thr_multi_lock(THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
596
656
THR_LOCK_DATA **pos,**end;
599
659
/* lock everything */
600
660
for (pos=data,end=data+count; pos < end ; pos++)
602
enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
662
enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
603
663
if (result != THR_LOCK_SUCCESS)
605
665
thr_multi_unlock(data,(uint32_t) (pos-data));
682
if (last_lock->lock == (*pos)->lock &&
683
last_lock->lock->copy_status)
685
if (last_lock->type <= TL_READ_NO_INSERT)
687
THR_LOCK_DATA **read_lock;
689
If we are locking the same table with read locks we must ensure
690
that all tables share the status of the last write lock or
694
(*pos)->type <= TL_READ_NO_INSERT &&
696
pos[-1]->lock == (*pos)->lock ;
702
(last_lock->lock->copy_status)((*read_lock)->status_param,
703
(*pos)->status_param);
704
} while (*(read_lock++) != last_lock);
705
last_lock= (*pos); /* Point at last write lock */
708
(*last_lock->lock->copy_status)((*pos)->status_param,
709
last_lock->status_param);
623
713
} while (pos != data);
643
void DrizzleLock::unlock(uint32_t count)
645
THR_LOCK_DATA **pos,**end;
647
for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
649
if ((*pos)->type != TL_UNLOCK)
655
734
Abort all threads waiting for a lock. The lock will be upgraded to
656
735
TL_WRITE_ONLY to abort any new accesses to the lock
659
void THR_LOCK::abort_locks()
738
void thr_abort_locks(THR_LOCK *lock)
661
boost_unique_lock_t scopedLock(mutex);
741
pthread_mutex_lock(&lock->mutex);
663
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
743
for (data=lock->read_wait.data; data ; data=data->next)
665
local_data->type= TL_UNLOCK; /* Mark killed */
745
data->type= TL_UNLOCK; /* Mark killed */
666
746
/* It's safe to signal the cond first: we're still holding the mutex. */
667
local_data->cond->notify_one();
668
local_data->cond= NULL; /* Removed from list */
747
pthread_cond_signal(data->cond);
748
data->cond= NULL; /* Removed from list */
670
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
750
for (data=lock->write_wait.data; data ; data=data->next)
672
local_data->type= TL_UNLOCK;
673
local_data->cond->notify_one();
674
local_data->cond= NULL;
752
data->type=TL_UNLOCK;
753
pthread_cond_signal(data->cond);
676
read_wait.last= &read_wait.data;
677
write_wait.last= &write_wait.data;
678
read_wait.data= write_wait.data=0;
680
write.data->type=TL_WRITE_ONLY;
756
lock->read_wait.last= &lock->read_wait.data;
757
lock->write_wait.last= &lock->write_wait.data;
758
lock->read_wait.data=lock->write_wait.data=0;
759
if (lock->write.data)
760
lock->write.data->type=TL_WRITE_ONLY;
761
pthread_mutex_unlock(&lock->mutex);
687
769
This is used to abort all locks for a specific thread
690
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
772
bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread_id)
692
775
bool found= false;
694
boost_unique_lock_t scopedLock(mutex);
695
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
777
pthread_mutex_lock(&lock->mutex);
778
for (data= lock->read_wait.data; data ; data= data->next)
697
if (local_data->owner->info->thread_id == thread_id_arg)
780
if (data->owner->info->thread_id == thread_id)
699
local_data->type= TL_UNLOCK; /* Mark killed */
782
data->type= TL_UNLOCK; /* Mark killed */
700
783
/* It's safe to signal the cond first: we're still holding the mutex. */
702
local_data->cond->notify_one();
703
local_data->cond= 0; /* Removed from list */
785
pthread_cond_signal(data->cond);
786
data->cond= 0; /* Removed from list */
705
if (((*local_data->prev)= local_data->next))
706
local_data->next->prev= local_data->prev;
788
if (((*data->prev)= data->next))
789
data->next->prev= data->prev;
708
read_wait.last= local_data->prev;
791
lock->read_wait.last= data->prev;
711
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
794
for (data= lock->write_wait.data; data ; data= data->next)
713
if (local_data->owner->info->thread_id == thread_id_arg)
796
if (data->owner->info->thread_id == thread_id)
715
local_data->type= TL_UNLOCK;
798
data->type= TL_UNLOCK;
717
local_data->cond->notify_one();
718
local_data->cond= NULL;
800
pthread_cond_signal(data->cond);
720
if (((*local_data->prev)= local_data->next))
721
local_data->next->prev= local_data->prev;
803
if (((*data->prev)= data->next))
804
data->next->prev= data->prev;
723
write_wait.last= local_data->prev;
806
lock->write_wait.last= data->prev;
726
wake_up_waiters(this);
809
wake_up_waiters(lock);
810
pthread_mutex_unlock(&lock->mutex);
731
} /* namespace drizzled */