12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
Read and write locks for Posix threads. All tread must acquire
46
46
should put a pointer to the following functions in the lock structure:
47
47
(If the pointer is zero (default), the function is not called)
50
Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
51
we check if this function exists and returns 0.
52
If not, then the lock is upgraded to TL_WRITE_LOCK
53
In MyISAM this is a simple check if the insert can be done
54
at the end of the datafile.
56
Before a write lock is released, this function is called.
57
In MyISAM this functions updates the count and length of the datafile
59
When one gets a lock this functions is called.
60
In MyISAM this stores the number of rows and size of the datafile
50
63
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
51
64
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
56
#include <drizzled/internal/my_sys.h>
57
#include <drizzled/internal/thread_var.h>
58
#include <drizzled/statistics_variables.h>
59
#include <drizzled/pthread_globals.h>
61
#include <drizzled/session.h>
69
#include "drizzled/internal/my_sys.h"
70
#include "drizzled/statistics_variables.h"
63
72
#include "thr_lock.h"
64
#include <drizzled/internal/m_string.h>
73
#include "drizzled/internal/m_string.h"
79
88
#include <drizzled/util/test.h>
81
#include <boost/interprocess/sync/lock_options.hpp>
83
90
using namespace std;
95
bool thr_lock_inited= false;
88
96
uint64_t table_lock_wait_timeout;
89
97
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
92
uint64_t max_write_lock_count= UINT64_MAX;
100
static list<THR_LOCK *> thr_lock_thread_list; /* List of threads in use */
102
uint64_t max_write_lock_count= ~(uint64_t) 0L;
104
static inline pthread_cond_t *get_cond(void)
106
return &my_thread_var->suspend;
95
110
** For the future (now the thread specific cond is alloced by my_pthread.c)
115
thr_lock_inited= true;
98
120
static inline bool
99
121
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
107
129
void thr_lock_init(THR_LOCK *lock)
131
memset(lock, 0, sizeof(*lock));
132
pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
110
133
lock->read.last= &lock->read.data;
111
134
lock->read_wait.last= &lock->read_wait.data;
112
135
lock->write_wait.last= &lock->write_wait.data;
113
136
lock->write.last= &lock->write.data;
117
void THR_LOCK_INFO::init()
138
pthread_mutex_lock(&internal::THR_LOCK_lock); /* Add to locks in use */
139
thr_lock_thread_list.push_front(lock);
140
pthread_mutex_unlock(&internal::THR_LOCK_lock);
144
void thr_lock_delete(THR_LOCK *lock)
146
pthread_mutex_destroy(&lock->mutex);
147
pthread_mutex_lock(&internal::THR_LOCK_lock);
148
thr_lock_thread_list.remove(lock);
149
pthread_mutex_unlock(&internal::THR_LOCK_lock);
153
void thr_lock_info_init(THR_LOCK_INFO *info)
119
155
internal::st_my_thread_var *tmp= my_thread_var;
156
info->thread= tmp->pthread_self;
157
info->thread_id= tmp->id;
124
161
/* Initialize a lock instance */
126
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
163
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
130
owner= NULL; /* no owner yet */
131
status_param= param_arg;
166
data->type= TL_UNLOCK;
167
data->owner= NULL; /* no owner yet */
168
data->status_param= param;
147
184
static void wake_up_waiters(THR_LOCK *lock);
150
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
187
static enum enum_thr_lock_result
188
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
152
internal::st_my_thread_var *thread_var= session.getThreadVar();
154
boost::condition_variable_any *cond= &thread_var->suspend;
191
internal::st_my_thread_var *thread_var= my_thread_var;
192
pthread_cond_t *cond= &thread_var->suspend;
193
struct timespec wait_timeout;
155
194
enum enum_thr_lock_result result= THR_LOCK_ABORTED;
156
195
bool can_deadlock= test(data->owner->info->n_cursors);
159
199
(*wait->last)=data; /* Wait for lock */
160
200
data->prev= wait->last;
161
201
wait->last= &data->next;
164
current_global_counters.locks_waited++;
204
status_var_increment(current_global_counters.locks_waited);
166
206
/* Set up control struct to allow others to abort locks */
167
thread_var->current_mutex= data->lock->native_handle();
168
thread_var->current_cond= &thread_var->suspend;
169
data->cond= &thread_var->suspend;;
207
thread_var->current_mutex= &data->lock->mutex;
208
thread_var->current_cond= cond;
171
while (not thread_var->abort)
212
set_timespec(wait_timeout, table_lock_wait_timeout);
213
while (!thread_var->abort || in_wait_list)
173
boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
178
xtime_get(&xt, boost::TIME_UTC);
179
xt.sec += table_lock_wait_timeout;
180
if (not cond->timed_wait(scoped, xt))
182
result= THR_LOCK_WAIT_TIMEOUT;
215
int rc= (can_deadlock ?
216
pthread_cond_timedwait(cond, &data->lock->mutex,
218
pthread_cond_wait(cond, &data->lock->mutex));
192
220
We must break the wait if one of the following occurs:
193
221
- the connection has been aborted (!thread_var->abort), but
201
229
Order of checks below is important to not report about timeout
202
230
if the predicate is true.
204
if (data->cond == NULL)
236
if (rc == ETIMEDOUT || rc == ETIME)
238
result= THR_LOCK_WAIT_TIMEOUT;
211
242
if (data->cond || data->type == TL_UNLOCK)
225
256
result= THR_LOCK_SUCCESS;
257
if (data->lock->get_status)
258
(*data->lock->get_status)(data->status_param, 0);
227
data->lock->unlock();
260
pthread_mutex_unlock(&data->lock->mutex);
229
262
/* The following must be done after unlock of lock->mutex */
230
boost_unique_lock_t scopedLock(thread_var->mutex);
263
pthread_mutex_lock(&thread_var->mutex);
231
264
thread_var->current_mutex= NULL;
232
265
thread_var->current_cond= NULL;
266
pthread_mutex_unlock(&thread_var->mutex);
237
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
271
static enum enum_thr_lock_result
272
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
273
enum thr_lock_type lock_type)
239
THR_LOCK *lock= data->lock;
275
THR_LOCK *lock=data->lock;
240
276
enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
241
277
struct st_lock_list *wait_queue;
242
278
THR_LOCK_DATA *lock_owner;
245
281
data->cond=0; /* safety */
246
282
data->type=lock_type;
247
283
data->owner= owner; /* Must be reset ! */
284
pthread_mutex_lock(&lock->mutex);
249
285
if ((int) lock_type <= (int) TL_READ_NO_INSERT)
251
287
/* Request for READ lock */
271
307
lock->read.last= &data->next;
272
308
if (lock_type == TL_READ_NO_INSERT)
273
309
lock->read_no_write_count++;
274
current_global_counters.locks_immediate++;
310
if (lock->get_status)
311
(*lock->get_status)(data->status_param, 0);
312
status_var_increment(current_global_counters.locks_immediate);
277
315
if (lock->write.data->type == TL_WRITE_ONLY)
289
327
(*lock->read.last)=data; /* Add to running FIFO */
290
328
data->prev=lock->read.last;
291
329
lock->read.last= &data->next;
330
if (lock->get_status)
331
(*lock->get_status)(data->status_param, 0);
292
332
if (lock_type == TL_READ_NO_INSERT)
293
333
lock->read_no_write_count++;
294
current_global_counters.locks_immediate++;
334
status_var_increment(current_global_counters.locks_immediate);
304
344
else /* Request for WRITE lock */
306
if (lock_type == TL_WRITE_CONCURRENT_INSERT)
346
if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
307
347
data->type=lock_type= thr_upgraded_concurrent_insert_lock;
309
349
if (lock->write.data) /* If there is a write lock */
338
378
(*lock->write.last)=data; /* Add to running fifo */
339
379
data->prev=lock->write.last;
340
380
lock->write.last= &data->next;
341
current_global_counters.locks_immediate++;
381
if (data->lock->get_status)
382
(*data->lock->get_status)(data->status_param, 0);
383
status_var_increment(current_global_counters.locks_immediate);
361
408
(*lock->write.last)=data; /* Add as current write lock */
362
409
data->prev=lock->write.last;
363
410
lock->write.last= &data->next;
364
current_global_counters.locks_immediate++;
411
if (data->lock->get_status)
412
(*data->lock->get_status)(data->status_param, concurrent_insert);
413
status_var_increment(current_global_counters.locks_immediate);
380
429
result= THR_LOCK_DEADLOCK;
384
432
/* Can't get lock yet; Wait for it */
385
return(wait_for_lock(session, wait_queue, data));
433
return(wait_for_lock(wait_queue, data, 0));
435
pthread_mutex_unlock(&lock->mutex);
393
440
static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
395
THR_LOCK_DATA *data= lock->read_wait.data;
442
THR_LOCK_DATA *data=lock->read_wait.data;
397
444
/* move all locks from read_wait list to read list */
398
445
(*lock->read.last)=data;
425
472
lock->read_no_write_count++;
427
data->cond= NULL; /* Mark thread free */
474
data->cond=0; /* Mark thread free */
475
pthread_cond_signal(cond);
429
476
} while ((data=data->next));
430
477
*lock->read_wait.last=0;
431
478
if (!lock->read_wait.data)
432
479
lock->write_lock_count=0;
435
/* Unlock lock and free next thread on same lock */
482
/* Unlock lock and free next thread on same lock */
437
484
static void thr_unlock(THR_LOCK_DATA *data)
439
486
THR_LOCK *lock=data->lock;
440
487
enum thr_lock_type lock_type=data->type;
488
pthread_mutex_lock(&lock->mutex);
443
490
if (((*data->prev)=data->next)) /* remove from lock-list */
444
491
data->next->prev= data->prev;
448
495
lock->write.last=data->prev;
449
496
if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
498
if (lock->update_status)
499
(*lock->update_status)(data->status_param);
503
if (lock->restore_status)
504
(*lock->restore_status)(data->status_param);
453
506
if (lock_type == TL_READ_NO_INSERT)
454
507
lock->read_no_write_count--;
455
508
data->type=TL_UNLOCK; /* Mark unlocked */
456
509
wake_up_waiters(lock);
510
pthread_mutex_unlock(&lock->mutex);
500
554
data->prev=lock->write.last;
502
556
lock->write.last= &data->next;
557
if (data->type == TL_WRITE_CONCURRENT_INSERT &&
558
(*lock->check_status)(data->status_param))
559
data->type=TL_WRITE; /* Upgrade lock */
505
boost::condition_variable_any *cond= data->cond;
506
data->cond= NULL; /* Mark thread free */
507
cond->notify_one(); /* Start waiting thred */
561
pthread_cond_t *cond=data->cond;
562
data->cond=0; /* Mark thread free */
563
pthread_cond_signal(cond); /* Start waiting thread */
509
565
if (data->type != TL_WRITE_ALLOW_WRITE ||
510
566
!lock->write_wait.data ||
528
584
lock_type != TL_WRITE_ALLOW_WRITE) ||
529
585
!lock->read_no_write_count))
588
For DELAYED, ALLOW_READ, WRITE_ALLOW_WRITE or CONCURRENT_INSERT locks
589
start WRITE locks together with the READ locks
591
if (lock_type == TL_WRITE_CONCURRENT_INSERT &&
592
(*lock->check_status)(data->status_param))
594
data->type=TL_WRITE; /* Upgrade lock */
595
if (lock->read_wait.data)
596
free_all_read_locks(lock,0);
532
boost::condition_variable_any *cond= data->cond;
600
pthread_cond_t *cond=data->cond;
533
601
if (((*data->prev)=data->next)) /* remove from wait-list */
534
602
data->next->prev= data->prev;
538
606
data->prev=lock->write.last;
539
607
lock->write.last= &data->next;
540
608
data->next=0; /* Only one write lock */
541
data->cond= NULL; /* Mark thread free */
542
cond->notify_one(); /* Start waiting thread */
609
data->cond=0; /* Mark thread free */
610
pthread_cond_signal(cond); /* Start waiting thread */
543
611
} while (lock_type == TL_WRITE_ALLOW_WRITE &&
544
612
(data=lock->write_wait.data) &&
545
613
data->type == TL_WRITE_ALLOW_WRITE);
591
657
enum enum_thr_lock_result
592
thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
658
thr_multi_lock(THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
594
660
THR_LOCK_DATA **pos,**end;
597
663
/* lock everything */
598
664
for (pos=data,end=data+count; pos < end ; pos++)
600
enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
666
enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
601
667
if (result != THR_LOCK_SUCCESS)
603
669
thr_multi_unlock(data,(uint32_t) (pos-data));
686
if (last_lock->lock == (*pos)->lock &&
687
last_lock->lock->copy_status)
689
if (last_lock->type <= TL_READ_NO_INSERT)
691
THR_LOCK_DATA **read_lock;
693
If we are locking the same table with read locks we must ensure
694
that all tables share the status of the last write lock or
698
(*pos)->type <= TL_READ_NO_INSERT &&
700
pos[-1]->lock == (*pos)->lock ;
706
(last_lock->lock->copy_status)((*read_lock)->status_param,
707
(*pos)->status_param);
708
} while (*(read_lock++) != last_lock);
709
last_lock= (*pos); /* Point at last write lock */
712
(*last_lock->lock->copy_status)((*pos)->status_param,
713
last_lock->status_param);
621
717
} while (pos != data);
641
void DrizzleLock::unlock(uint32_t count)
643
THR_LOCK_DATA **pos,**end;
645
for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
647
if ((*pos)->type != TL_UNLOCK)
653
738
Abort all threads waiting for a lock. The lock will be upgraded to
654
739
TL_WRITE_ONLY to abort any new accesses to the lock
657
void THR_LOCK::abort_locks()
742
void thr_abort_locks(THR_LOCK *lock)
659
boost_unique_lock_t scopedLock(mutex);
745
pthread_mutex_lock(&lock->mutex);
661
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
747
for (data=lock->read_wait.data; data ; data=data->next)
663
local_data->type= TL_UNLOCK; /* Mark killed */
749
data->type= TL_UNLOCK; /* Mark killed */
664
750
/* It's safe to signal the cond first: we're still holding the mutex. */
665
local_data->cond->notify_one();
666
local_data->cond= NULL; /* Removed from list */
751
pthread_cond_signal(data->cond);
752
data->cond= NULL; /* Removed from list */
668
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
754
for (data=lock->write_wait.data; data ; data=data->next)
670
local_data->type= TL_UNLOCK;
671
local_data->cond->notify_one();
672
local_data->cond= NULL;
756
data->type=TL_UNLOCK;
757
pthread_cond_signal(data->cond);
674
read_wait.last= &read_wait.data;
675
write_wait.last= &write_wait.data;
676
read_wait.data= write_wait.data=0;
678
write.data->type=TL_WRITE_ONLY;
760
lock->read_wait.last= &lock->read_wait.data;
761
lock->write_wait.last= &lock->write_wait.data;
762
lock->read_wait.data=lock->write_wait.data=0;
763
if (lock->write.data)
764
lock->write.data->type=TL_WRITE_ONLY;
765
pthread_mutex_unlock(&lock->mutex);
685
773
This is used to abort all locks for a specific thread
688
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
776
bool thr_abort_locks_for_thread(THR_LOCK *lock, uint64_t thread_id)
690
779
bool found= false;
692
boost_unique_lock_t scopedLock(mutex);
693
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
781
pthread_mutex_lock(&lock->mutex);
782
for (data= lock->read_wait.data; data ; data= data->next)
695
if (local_data->owner->info->thread_id == thread_id_arg)
784
if (data->owner->info->thread_id == thread_id)
697
local_data->type= TL_UNLOCK; /* Mark killed */
786
data->type= TL_UNLOCK; /* Mark killed */
698
787
/* It's safe to signal the cond first: we're still holding the mutex. */
700
local_data->cond->notify_one();
701
local_data->cond= 0; /* Removed from list */
789
pthread_cond_signal(data->cond);
790
data->cond= 0; /* Removed from list */
703
if (((*local_data->prev)= local_data->next))
704
local_data->next->prev= local_data->prev;
792
if (((*data->prev)= data->next))
793
data->next->prev= data->prev;
706
read_wait.last= local_data->prev;
795
lock->read_wait.last= data->prev;
709
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
798
for (data= lock->write_wait.data; data ; data= data->next)
711
if (local_data->owner->info->thread_id == thread_id_arg)
800
if (data->owner->info->thread_id == thread_id)
713
local_data->type= TL_UNLOCK;
802
data->type= TL_UNLOCK;
715
local_data->cond->notify_one();
716
local_data->cond= NULL;
804
pthread_cond_signal(data->cond);
718
if (((*local_data->prev)= local_data->next))
719
local_data->next->prev= local_data->prev;
807
if (((*data->prev)= data->next))
808
data->next->prev= data->prev;
721
write_wait.last= local_data->prev;
810
lock->write_wait.last= data->prev;
724
wake_up_waiters(this);
813
wake_up_waiters(lock);
814
pthread_mutex_unlock(&lock->mutex);
729
818
} /* namespace drizzled */