46
50
should put a pointer to the following functions in the lock structure:
47
51
(If the pointer is zero (default), the function is not called)
54
Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
55
we check if this function exists and returns 0.
56
If not, then the lock is upgraded to TL_WRITE_LOCK
57
In MyISAM this is a simple check if the insert can be done
58
at the end of the datafile.
60
Before a write lock is released, this function is called.
61
In MyISAM this functions updates the count and length of the datafile
63
When one gets a lock this functions is called.
64
In MyISAM this stores the number of rows and size of the datafile
50
67
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
51
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
68
TL_WRITE_CONCURRENT_INSERT or one TL_WRITE_DELAYED lock at the same time as
56
#include <drizzled/internal/my_sys.h>
57
#include <drizzled/internal/thread_var.h>
58
#include <drizzled/statistics_variables.h>
59
#include <drizzled/pthread_globals.h>
61
#include <drizzled/session.h>
73
#include "mysys_priv.h"
63
75
#include "thr_lock.h"
64
#include <drizzled/internal/m_string.h>
76
#include <mystrings/m_string.h>
68
79
#if TIME_WITH_SYS_TIME
69
80
# include <sys/time.h>
107
125
void thr_lock_init(THR_LOCK *lock)
127
memset(lock, 0, sizeof(*lock));
128
pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
110
129
lock->read.last= &lock->read.data;
111
130
lock->read_wait.last= &lock->read_wait.data;
112
131
lock->write_wait.last= &lock->write_wait.data;
113
132
lock->write.last= &lock->write.data;
117
void THR_LOCK_INFO::init()
119
internal::st_my_thread_var *tmp= my_thread_var;
134
pthread_mutex_lock(&THR_LOCK_lock); /* Add to locks in use */
135
lock->list.data=(void*) lock;
136
thr_lock_thread_list=list_add(thr_lock_thread_list,&lock->list);
137
pthread_mutex_unlock(&THR_LOCK_lock);
142
void thr_lock_delete(THR_LOCK *lock)
144
pthread_mutex_destroy(&lock->mutex);
145
pthread_mutex_lock(&THR_LOCK_lock);
146
thr_lock_thread_list=list_delete(thr_lock_thread_list,&lock->list);
147
pthread_mutex_unlock(&THR_LOCK_lock);
152
void thr_lock_info_init(THR_LOCK_INFO *info)
154
struct st_my_thread_var *tmp= my_thread_var;
155
info->thread= tmp->pthread_self;
156
info->thread_id= tmp->id;
124
160
/* Initialize a lock instance */
126
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
162
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
130
owner= NULL; /* no owner yet */
131
status_param= param_arg;
165
data->type=TL_UNLOCK;
166
data->owner= 0; /* no owner yet */
167
data->status_param=param;
139
175
for ( ; data ; data=data->next)
141
177
if (thr_lock_owner_equal(data->owner, owner))
142
return true; /* Already locked by thread */
178
return 1; /* Already locked by thread */
183
static inline bool have_specific_lock(THR_LOCK_DATA *data,
184
enum thr_lock_type type)
186
for ( ; data ; data=data->next)
188
if (data->type == type)
147
195
static void wake_up_waiters(THR_LOCK *lock);
150
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
198
static enum enum_thr_lock_result
199
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
152
internal::st_my_thread_var *thread_var= session.getThreadVar();
154
boost::condition_variable_any *cond= &thread_var->suspend;
202
struct st_my_thread_var *thread_var= my_thread_var;
203
pthread_cond_t *cond= &thread_var->suspend;
204
struct timespec wait_timeout;
155
205
enum enum_thr_lock_result result= THR_LOCK_ABORTED;
156
206
bool can_deadlock= test(data->owner->info->n_cursors);
159
210
(*wait->last)=data; /* Wait for lock */
160
211
data->prev= wait->last;
161
212
wait->last= &data->next;
164
current_global_counters.locks_waited++;
215
statistic_increment(locks_waited, &THR_LOCK_lock);
166
217
/* Set up control struct to allow others to abort locks */
167
thread_var->current_mutex= data->lock->native_handle();
168
thread_var->current_cond= &thread_var->suspend;
169
data->cond= &thread_var->suspend;;
218
thread_var->current_mutex= &data->lock->mutex;
219
thread_var->current_cond= cond;
171
while (not thread_var->abort)
223
set_timespec(wait_timeout, table_lock_wait_timeout);
224
while (!thread_var->abort || in_wait_list)
173
boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
178
xtime_get(&xt, boost::TIME_UTC);
179
xt.sec += table_lock_wait_timeout;
180
if (not cond->timed_wait(scoped, xt))
182
result= THR_LOCK_WAIT_TIMEOUT;
226
int rc= (can_deadlock ?
227
pthread_cond_timedwait(cond, &data->lock->mutex,
229
pthread_cond_wait(cond, &data->lock->mutex));
192
231
We must break the wait if one of the following occurs:
193
232
- the connection has been aborted (!thread_var->abort), but
225
269
result= THR_LOCK_SUCCESS;
270
if (data->lock->get_status)
271
(*data->lock->get_status)(data->status_param, 0);
227
data->lock->unlock();
273
pthread_mutex_unlock(&data->lock->mutex);
229
275
/* The following must be done after unlock of lock->mutex */
230
boost_unique_lock_t scopedLock(thread_var->mutex);
231
thread_var->current_mutex= NULL;
232
thread_var->current_cond= NULL;
276
pthread_mutex_lock(&thread_var->mutex);
277
thread_var->current_mutex= 0;
278
thread_var->current_cond= 0;
279
pthread_mutex_unlock(&thread_var->mutex);
237
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
284
enum enum_thr_lock_result
285
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
286
enum thr_lock_type lock_type)
239
THR_LOCK *lock= data->lock;
288
THR_LOCK *lock=data->lock;
240
289
enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
241
290
struct st_lock_list *wait_queue;
242
291
THR_LOCK_DATA *lock_owner;
304
358
else /* Request for WRITE lock */
306
if (lock_type == TL_WRITE_CONCURRENT_INSERT)
360
if (lock_type == TL_WRITE_DELAYED)
362
if (lock->write.data && lock->write.data->type == TL_WRITE_ONLY)
364
data->type=TL_UNLOCK;
365
result= THR_LOCK_ABORTED; /* Can't wait for this one */
369
if there is a TL_WRITE_ALLOW_READ lock, we have to wait for a lock
370
(TL_WRITE_ALLOW_READ is used for ALTER TABLE in MySQL)
372
if ((!lock->write.data ||
373
lock->write.data->type != TL_WRITE_ALLOW_READ) &&
374
!have_specific_lock(lock->write_wait.data,TL_WRITE_ALLOW_READ) &&
375
(lock->write.data || lock->read.data))
377
/* Add delayed write lock to write_wait queue, and return at once */
378
(*lock->write_wait.last)=data;
379
data->prev=lock->write_wait.last;
380
lock->write_wait.last= &data->next;
381
data->cond=get_cond();
383
We don't have to do get_status here as we will do it when we change
384
the delayed lock to a real write lock
386
statistic_increment(locks_immediate,&THR_LOCK_lock);
390
else if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
307
391
data->type=lock_type= thr_upgraded_concurrent_insert_lock;
309
393
if (lock->write.data) /* If there is a write lock */
425
517
lock->read_no_write_count++;
427
data->cond= NULL; /* Mark thread free */
519
data->cond=0; /* Mark thread free */
520
pthread_cond_signal(cond);
429
521
} while ((data=data->next));
430
522
*lock->read_wait.last=0;
431
523
if (!lock->read_wait.data)
432
524
lock->write_lock_count=0;
435
/* Unlock lock and free next thread on same lock */
527
/* Unlock lock and free next thread on same lock */
437
static void thr_unlock(THR_LOCK_DATA *data)
529
void thr_unlock(THR_LOCK_DATA *data)
439
531
THR_LOCK *lock=data->lock;
440
532
enum thr_lock_type lock_type=data->type;
533
pthread_mutex_lock(&lock->mutex);
443
535
if (((*data->prev)=data->next)) /* remove from lock-list */
444
536
data->next->prev= data->prev;
445
537
else if (lock_type <= TL_READ_NO_INSERT)
446
538
lock->read.last=data->prev;
539
else if (lock_type == TL_WRITE_DELAYED && data->cond)
542
This only happens in extreme circumstances when a
543
write delayed lock that is waiting for a lock
545
lock->write_wait.last=data->prev; /* Put it on wait queue */
448
548
lock->write.last=data->prev;
449
549
if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
551
if (lock->update_status)
552
(*lock->update_status)(data->status_param);
556
if (lock->restore_status)
557
(*lock->restore_status)(data->status_param);
453
559
if (lock_type == TL_READ_NO_INSERT)
454
560
lock->read_no_write_count--;
455
561
data->type=TL_UNLOCK; /* Mark unlocked */
456
562
wake_up_waiters(lock);
563
pthread_mutex_unlock(&lock->mutex);
641
void DrizzleLock::unlock(uint32_t count)
643
THR_LOCK_DATA **pos,**end;
645
for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
647
if ((*pos)->type != TL_UNLOCK)
653
792
Abort all threads waiting for a lock. The lock will be upgraded to
654
793
TL_WRITE_ONLY to abort any new accesses to the lock
657
void THR_LOCK::abort_locks()
796
void thr_abort_locks(THR_LOCK *lock, bool upgrade_lock)
659
boost_unique_lock_t scopedLock(mutex);
799
pthread_mutex_lock(&lock->mutex);
661
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
801
for (data=lock->read_wait.data; data ; data=data->next)
663
local_data->type= TL_UNLOCK; /* Mark killed */
803
data->type=TL_UNLOCK; /* Mark killed */
664
804
/* It's safe to signal the cond first: we're still holding the mutex. */
665
local_data->cond->notify_one();
666
local_data->cond= NULL; /* Removed from list */
805
pthread_cond_signal(data->cond);
806
data->cond=0; /* Removed from list */
668
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
808
for (data=lock->write_wait.data; data ; data=data->next)
670
local_data->type= TL_UNLOCK;
671
local_data->cond->notify_one();
672
local_data->cond= NULL;
810
data->type=TL_UNLOCK;
811
pthread_cond_signal(data->cond);
674
read_wait.last= &read_wait.data;
675
write_wait.last= &write_wait.data;
676
read_wait.data= write_wait.data=0;
678
write.data->type=TL_WRITE_ONLY;
814
lock->read_wait.last= &lock->read_wait.data;
815
lock->write_wait.last= &lock->write_wait.data;
816
lock->read_wait.data=lock->write_wait.data=0;
817
if (upgrade_lock && lock->write.data)
818
lock->write.data->type=TL_WRITE_ONLY;
819
pthread_mutex_unlock(&lock->mutex);
685
827
This is used to abort all locks for a specific thread
688
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
830
bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread_id)
690
833
bool found= false;
692
boost_unique_lock_t scopedLock(mutex);
693
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
835
pthread_mutex_lock(&lock->mutex);
836
for (data= lock->read_wait.data; data ; data= data->next)
695
if (local_data->owner->info->thread_id == thread_id_arg)
838
if (data->owner->info->thread_id == thread_id) /* purecov: tested */
697
local_data->type= TL_UNLOCK; /* Mark killed */
840
data->type= TL_UNLOCK; /* Mark killed */
698
841
/* It's safe to signal the cond first: we're still holding the mutex. */
700
local_data->cond->notify_one();
701
local_data->cond= 0; /* Removed from list */
843
pthread_cond_signal(data->cond);
844
data->cond= 0; /* Removed from list */
703
if (((*local_data->prev)= local_data->next))
704
local_data->next->prev= local_data->prev;
846
if (((*data->prev)= data->next))
847
data->next->prev= data->prev;
706
read_wait.last= local_data->prev;
849
lock->read_wait.last= data->prev;
709
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
852
for (data= lock->write_wait.data; data ; data= data->next)
711
if (local_data->owner->info->thread_id == thread_id_arg)
854
if (data->owner->info->thread_id == thread_id) /* purecov: tested */
713
local_data->type= TL_UNLOCK;
856
data->type= TL_UNLOCK;
715
local_data->cond->notify_one();
716
local_data->cond= NULL;
858
pthread_cond_signal(data->cond);
718
if (((*local_data->prev)= local_data->next))
719
local_data->next->prev= local_data->prev;
861
if (((*data->prev)= data->next))
862
data->next->prev= data->prev;
721
write_wait.last= local_data->prev;
724
wake_up_waiters(this);
729
} /* namespace drizzled */
864
lock->write_wait.last= data->prev;
867
wake_up_waiters(lock);
868
pthread_mutex_unlock(&lock->mutex);
874
Downgrade a WRITE_* to a lower WRITE level
876
thr_downgrade_write_lock()
877
in_data Lock data of thread downgrading its lock
878
new_lock_type New write lock type
882
This can be used to downgrade a lock already owned. When the downgrade
883
occurs also other waiters, both readers and writers can be allowed to
885
The previous lock is often TL_WRITE_ONLY but can also be
886
TL_WRITE and TL_WRITE_ALLOW_READ. The normal downgrade variants are
887
TL_WRITE_ONLY => TL_WRITE_ALLOW_READ After a short exclusive lock
888
TL_WRITE_ALLOW_READ => TL_WRITE_ALLOW_WRITE After discovering that the
889
operation didn't need such a high lock.
890
TL_WRITE_ONLY => TL_WRITE after a short exclusive lock while holding a
892
TL_WRITE_ONLY => TL_WRITE_ALLOW_WRITE After a short exclusive lock after
893
already earlier having dongraded lock to TL_WRITE_ALLOW_WRITE
894
The implementation is conservative and rather don't start rather than
895
go on unknown paths to start, the common cases are handled.
898
In its current implementation it is only allowed to downgrade from
899
TL_WRITE_ONLY. In this case there are no waiters. Thus no wake up
903
void thr_downgrade_write_lock(THR_LOCK_DATA *in_data,
904
enum thr_lock_type new_lock_type)
906
THR_LOCK *lock=in_data->lock;
908
pthread_mutex_lock(&lock->mutex);
909
in_data->type= new_lock_type;
911
pthread_mutex_unlock(&lock->mutex);
915
/* Upgrade a WRITE_DELAY lock to a WRITE_LOCK */
917
bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data)
919
THR_LOCK *lock=data->lock;
921
pthread_mutex_lock(&lock->mutex);
922
if (data->type == TL_UNLOCK || data->type >= TL_WRITE_LOW_PRIORITY)
924
pthread_mutex_unlock(&lock->mutex);
925
return(data->type == TL_UNLOCK); /* Test if Aborted */
927
/* TODO: Upgrade to TL_WRITE_CONCURRENT_INSERT in some cases */
928
data->type=TL_WRITE; /* Upgrade lock */
930
/* Check if someone has given us the lock */
933
if (!lock->read.data) /* No read locks */
934
{ /* We have the lock */
935
if (data->lock->get_status)
936
(*data->lock->get_status)(data->status_param, 0);
937
pthread_mutex_unlock(&lock->mutex);
941
if (((*data->prev)=data->next)) /* remove from lock-list */
942
data->next->prev= data->prev;
944
lock->write.last=data->prev;
946
if ((data->next=lock->write_wait.data)) /* Put first in lock_list */
947
data->next->prev= &data->next;
949
lock->write_wait.last= &data->next;
950
data->prev= &lock->write_wait.data;
951
lock->write_wait.data=data;
954
return(wait_for_lock(&lock->write_wait,data,1));
958
/* downgrade a WRITE lock to a WRITE_DELAY lock if there is pending locks */
960
bool thr_reschedule_write_lock(THR_LOCK_DATA *data)
962
THR_LOCK *lock=data->lock;
964
pthread_mutex_lock(&lock->mutex);
965
if (!lock->read_wait.data) /* No waiting read locks */
967
pthread_mutex_unlock(&lock->mutex);
971
data->type=TL_WRITE_DELAYED;
972
if (lock->update_status)
973
(*lock->update_status)(data->status_param);
974
if (((*data->prev)=data->next)) /* remove from lock-list */
975
data->next->prev= data->prev;
977
lock->write.last=data->prev;
979
if ((data->next=lock->write_wait.data)) /* Put first in lock_list */
980
data->next->prev= &data->next;
982
lock->write_wait.last= &data->next;
983
data->prev= &lock->write_wait.data;
984
data->cond=get_cond(); /* This was zero */
985
lock->write_wait.data=data;
986
free_all_read_locks(lock,0);
988
pthread_mutex_unlock(&lock->mutex);
989
return(thr_upgrade_write_delay_lock(data));