46
50
should put a pointer to the following functions in the lock structure:
47
51
(If the pointer is zero (default), the function is not called)
54
Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
55
we check if this function exists and returns 0.
56
If not, then the lock is upgraded to TL_WRITE_LOCK
57
In MyISAM this is a simple check if the insert can be done
58
at the end of the datafile.
60
Before a write lock is released, this function is called.
61
In MyISAM this functions updates the count and length of the datafile
63
When one gets a lock this functions is called.
64
In MyISAM this stores the number of rows and size of the datafile
50
67
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
51
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
68
TL_WRITE_CONCURRENT_INSERT or one TL_WRITE_DELAYED lock at the same time as
56
#include "drizzled/internal/my_sys.h"
57
#include "drizzled/internal/thread_var.h"
58
#include "drizzled/statistics_variables.h"
60
#include "drizzled/session.h"
61
#include "drizzled/current_session.h"
73
#include "mysys_priv.h"
63
75
#include "thr_lock.h"
64
#include "drizzled/internal/m_string.h"
76
#include <mystrings/m_string.h>
68
#if TIME_WITH_SYS_TIME
69
# include <sys/time.h>
73
# include <sys/time.h>
79
#include <drizzled/util/test.h>
79
bool thr_lock_inited=0;
80
uint32_t locks_immediate = 0L, locks_waited = 0L;
81
ulong table_lock_wait_timeout;
82
enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
84
/* The following constants are only for debug output */
85
#define MAX_THREADS 100
89
LIST *thr_lock_thread_list; /* List of threads in use */
90
ulong max_write_lock_count= ~(ulong) 0L;
92
static inline pthread_cond_t *get_cond(void)
86
uint64_t table_lock_wait_timeout;
87
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
90
uint64_t max_write_lock_count= ~(uint64_t) 0L;
94
return &my_thread_var->suspend;
93
98
** For the future (now the thread specific cond is alloced by my_pthread.c)
96
107
static inline bool
97
108
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
99
110
return rhs == lhs;
114
#define MAX_FOUND_ERRORS 10 /* Report 10 first errors */
115
static uint found_errors=0;
117
static int check_lock(struct st_lock_list *list, const char* lock_type,
118
const char *where, bool same_owner, bool no_cond)
120
THR_LOCK_DATA *data,**prev;
122
THR_LOCK_OWNER *first_owner;
127
enum thr_lock_type last_lock_type=list->data->type;
129
if (same_owner && list->data)
130
first_owner= list->data->owner;
131
for (data=list->data; data && count++ < MAX_LOCKS ; data=data->next)
133
if (data->type != last_lock_type)
134
last_lock_type=TL_IGNORE;
135
if (data->prev != prev)
138
"Warning: prev link %d didn't point at previous lock at %s: %s\n",
139
count, lock_type, where);
143
!thr_lock_owner_equal(data->owner, first_owner) &&
144
last_lock_type != TL_WRITE_ALLOW_WRITE)
147
"Warning: Found locks from different threads in %s: %s\n",
151
if (no_cond && data->cond)
154
"Warning: Found active lock with not reset cond %s: %s\n",
162
fprintf(stderr,"Warning: found too many locks at %s: %s\n",
167
if (prev != list->last)
169
fprintf(stderr,"Warning: last didn't point at last lock at %s: %s\n",
177
static void check_locks(THR_LOCK *lock, const char *where,
180
uint old_found_errors=found_errors;
182
if (found_errors < MAX_FOUND_ERRORS)
184
if (check_lock(&lock->write,"write",where,1,1) |
185
check_lock(&lock->write_wait,"write_wait",where,0,0) |
186
check_lock(&lock->read,"read",where,0,1) |
187
check_lock(&lock->read_wait,"read_wait",where,0,0))
190
if (found_errors < MAX_FOUND_ERRORS)
194
for (data=lock->read.data ; data ; data=data->next)
196
if ((int) data->type == (int) TL_READ_NO_INSERT)
198
/* Protect against infinite loop. */
199
assert(count <= lock->read_no_write_count);
201
if (count != lock->read_no_write_count)
205
"Warning at '%s': Locks read_no_write_count was %u when it should have been %u\n", where, lock->read_no_write_count,count);
208
if (!lock->write.data)
210
if (!allow_no_locks && !lock->read.data &&
211
(lock->write_wait.data || lock->read_wait.data))
215
"Warning at '%s': No locks in use but locks are in wait queue\n",
218
if (!lock->write_wait.data)
220
if (!allow_no_locks && lock->read_wait.data)
224
"Warning at '%s': No write locks and waiting read locks\n",
230
if (!allow_no_locks &&
231
(((lock->write_wait.data->type == TL_WRITE_CONCURRENT_INSERT ||
232
lock->write_wait.data->type == TL_WRITE_ALLOW_WRITE) &&
233
!lock->read_no_write_count) ||
234
lock->write_wait.data->type == TL_WRITE_ALLOW_READ ||
235
(lock->write_wait.data->type == TL_WRITE_DELAYED &&
240
"Warning at '%s': Write lock %d waiting while no exclusive read locks\n",where,(int) lock->write_wait.data->type);
245
{ /* Have write lock */
246
if (lock->write_wait.data)
248
if (!allow_no_locks &&
249
lock->write.data->type == TL_WRITE_ALLOW_WRITE &&
250
lock->write_wait.data->type == TL_WRITE_ALLOW_WRITE)
254
"Warning at '%s': Found WRITE_ALLOW_WRITE lock waiting for WRITE_ALLOW_WRITE lock\n",
260
if (!thr_lock_owner_equal(lock->write.data->owner,
261
lock->read.data->owner) &&
262
((lock->write.data->type > TL_WRITE_DELAYED &&
263
lock->write.data->type != TL_WRITE_ONLY) ||
264
((lock->write.data->type == TL_WRITE_CONCURRENT_INSERT ||
265
lock->write.data->type == TL_WRITE_ALLOW_WRITE) &&
266
lock->read_no_write_count)))
270
"Warning at '%s': Found lock of type %d that is write and read locked\n",
271
where, lock->write.data->type);
274
if (lock->read_wait.data)
276
if (!allow_no_locks && lock->write.data->type <= TL_WRITE_DELAYED &&
277
lock->read_wait.data->type <= TL_READ_HIGH_PRIORITY)
281
"Warning at '%s': Found read lock of type %d waiting for write lock of type %d\n",
283
(int) lock->read_wait.data->type,
284
(int) lock->write.data->type);
293
#else /* EXTRA_DEBUG */
294
#define check_locks(A,B,C)
103
298
/* Initialize a lock */
105
300
void thr_lock_init(THR_LOCK *lock)
302
memset(lock, 0, sizeof(*lock));
303
VOID(pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST));
108
304
lock->read.last= &lock->read.data;
109
305
lock->read_wait.last= &lock->read_wait.data;
110
306
lock->write_wait.last= &lock->write_wait.data;
111
307
lock->write.last= &lock->write.data;
115
void THR_LOCK_INFO::init()
117
internal::st_my_thread_var *tmp= my_thread_var;
118
thread= tmp->pthread_self;
309
pthread_mutex_lock(&THR_LOCK_lock); /* Add to locks in use */
310
lock->list.data=(void*) lock;
311
thr_lock_thread_list=list_add(thr_lock_thread_list,&lock->list);
312
pthread_mutex_unlock(&THR_LOCK_lock);
317
void thr_lock_delete(THR_LOCK *lock)
319
VOID(pthread_mutex_destroy(&lock->mutex));
320
pthread_mutex_lock(&THR_LOCK_lock);
321
thr_lock_thread_list=list_delete(thr_lock_thread_list,&lock->list);
322
pthread_mutex_unlock(&THR_LOCK_lock);
327
void thr_lock_info_init(THR_LOCK_INFO *info)
329
struct st_my_thread_var *tmp= my_thread_var;
330
info->thread= tmp->pthread_self;
331
info->thread_id= tmp->id;
123
335
/* Initialize a lock instance */
125
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
337
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
129
owner= NULL; /* no owner yet */
130
status_param= param_arg;
340
data->type=TL_UNLOCK;
341
data->owner= 0; /* no owner yet */
342
data->status_param=param;
666
1029
This is used to abort all locks for a specific thread
669
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
1032
bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread_id)
1034
THR_LOCK_DATA *data;
671
1035
bool found= false;
673
boost::mutex::scoped_lock scopedLock(mutex);
674
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
1037
pthread_mutex_lock(&lock->mutex);
1038
for (data= lock->read_wait.data; data ; data= data->next)
676
if (local_data->owner->info->thread_id == thread_id_arg)
1040
if (data->owner->info->thread_id == thread_id) /* purecov: tested */
678
local_data->type= TL_UNLOCK; /* Mark killed */
1042
data->type= TL_UNLOCK; /* Mark killed */
679
1043
/* It's safe to signal the cond first: we're still holding the mutex. */
681
local_data->cond->notify_one();
682
local_data->cond= 0; /* Removed from list */
1045
pthread_cond_signal(data->cond);
1046
data->cond= 0; /* Removed from list */
684
if (((*local_data->prev)= local_data->next))
685
local_data->next->prev= local_data->prev;
1048
if (((*data->prev)= data->next))
1049
data->next->prev= data->prev;
687
read_wait.last= local_data->prev;
1051
lock->read_wait.last= data->prev;
690
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
1054
for (data= lock->write_wait.data; data ; data= data->next)
692
if (local_data->owner->info->thread_id == thread_id_arg)
1056
if (data->owner->info->thread_id == thread_id) /* purecov: tested */
694
local_data->type= TL_UNLOCK;
1058
data->type= TL_UNLOCK;
696
local_data->cond->notify_one();
697
local_data->cond= NULL;
699
if (((*local_data->prev)= local_data->next))
700
local_data->next->prev= local_data->prev;
702
write_wait.last= local_data->prev;
705
wake_up_waiters(this);
710
} /* namespace drizzled */
1060
pthread_cond_signal(data->cond);
1063
if (((*data->prev)= data->next))
1064
data->next->prev= data->prev;
1066
lock->write_wait.last= data->prev;
1069
wake_up_waiters(lock);
1070
pthread_mutex_unlock(&lock->mutex);
1076
Downgrade a WRITE_* to a lower WRITE level
1078
thr_downgrade_write_lock()
1079
in_data Lock data of thread downgrading its lock
1080
new_lock_type New write lock type
1084
This can be used to downgrade a lock already owned. When the downgrade
1085
occurs also other waiters, both readers and writers can be allowed to
1087
The previous lock is often TL_WRITE_ONLY but can also be
1088
TL_WRITE and TL_WRITE_ALLOW_READ. The normal downgrade variants are
1089
TL_WRITE_ONLY => TL_WRITE_ALLOW_READ After a short exclusive lock
1090
TL_WRITE_ALLOW_READ => TL_WRITE_ALLOW_WRITE After discovering that the
1091
operation didn't need such a high lock.
1092
TL_WRITE_ONLY => TL_WRITE after a short exclusive lock while holding a
1094
TL_WRITE_ONLY => TL_WRITE_ALLOW_WRITE After a short exclusive lock after
1095
already earlier having dongraded lock to TL_WRITE_ALLOW_WRITE
1096
The implementation is conservative and rather don't start rather than
1097
go on unknown paths to start, the common cases are handled.
1100
In its current implementation it is only allowed to downgrade from
1101
TL_WRITE_ONLY. In this case there are no waiters. Thus no wake up
1105
void thr_downgrade_write_lock(THR_LOCK_DATA *in_data,
1106
enum thr_lock_type new_lock_type)
1108
THR_LOCK *lock=in_data->lock;
1110
pthread_mutex_lock(&lock->mutex);
1111
in_data->type= new_lock_type;
1112
check_locks(lock,"after downgrading lock",0);
1114
pthread_mutex_unlock(&lock->mutex);
1118
/* Upgrade a WRITE_DELAY lock to a WRITE_LOCK */
1120
bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data)
1122
THR_LOCK *lock=data->lock;
1124
pthread_mutex_lock(&lock->mutex);
1125
if (data->type == TL_UNLOCK || data->type >= TL_WRITE_LOW_PRIORITY)
1127
pthread_mutex_unlock(&lock->mutex);
1128
return(data->type == TL_UNLOCK); /* Test if Aborted */
1130
check_locks(lock,"before upgrading lock",0);
1131
/* TODO: Upgrade to TL_WRITE_CONCURRENT_INSERT in some cases */
1132
data->type=TL_WRITE; /* Upgrade lock */
1134
/* Check if someone has given us the lock */
1137
if (!lock->read.data) /* No read locks */
1138
{ /* We have the lock */
1139
if (data->lock->get_status)
1140
(*data->lock->get_status)(data->status_param, 0);
1141
pthread_mutex_unlock(&lock->mutex);
1145
if (((*data->prev)=data->next)) /* remove from lock-list */
1146
data->next->prev= data->prev;
1148
lock->write.last=data->prev;
1150
if ((data->next=lock->write_wait.data)) /* Put first in lock_list */
1151
data->next->prev= &data->next;
1153
lock->write_wait.last= &data->next;
1154
data->prev= &lock->write_wait.data;
1155
lock->write_wait.data=data;
1156
check_locks(lock,"upgrading lock",0);
1160
check_locks(lock,"waiting for lock",0);
1162
return(wait_for_lock(&lock->write_wait,data,1));
1166
/* downgrade a WRITE lock to a WRITE_DELAY lock if there is pending locks */
1168
bool thr_reschedule_write_lock(THR_LOCK_DATA *data)
1170
THR_LOCK *lock=data->lock;
1172
pthread_mutex_lock(&lock->mutex);
1173
if (!lock->read_wait.data) /* No waiting read locks */
1175
pthread_mutex_unlock(&lock->mutex);
1179
data->type=TL_WRITE_DELAYED;
1180
if (lock->update_status)
1181
(*lock->update_status)(data->status_param);
1182
if (((*data->prev)=data->next)) /* remove from lock-list */
1183
data->next->prev= data->prev;
1185
lock->write.last=data->prev;
1187
if ((data->next=lock->write_wait.data)) /* Put first in lock_list */
1188
data->next->prev= &data->next;
1190
lock->write_wait.last= &data->next;
1191
data->prev= &lock->write_wait.data;
1192
data->cond=get_cond(); /* This was zero */
1193
lock->write_wait.data=data;
1194
free_all_read_locks(lock,0);
1196
pthread_mutex_unlock(&lock->mutex);
1197
return(thr_upgrade_write_delay_lock(data));
1203
/*****************************************************************************
1204
** Test of thread locks
1205
****************************************************************************/
1211
enum thr_lock_type lock_type;
1214
THR_LOCK locks[5]; /* 4 locks */
1216
struct st_test test_0[] = {{0,TL_READ}}; /* One lock */
1217
struct st_test test_1[] = {{0,TL_READ},{0,TL_WRITE}}; /* Read and write lock of lock 0 */
1218
struct st_test test_2[] = {{1,TL_WRITE},{0,TL_READ},{2,TL_READ}};
1219
struct st_test test_3[] = {{2,TL_WRITE},{1,TL_READ},{0,TL_READ}}; /* Deadlock with test_2 ? */
1220
struct st_test test_4[] = {{0,TL_WRITE},{0,TL_READ},{0,TL_WRITE},{0,TL_READ}};
1221
struct st_test test_5[] = {{0,TL_READ},{1,TL_READ},{2,TL_READ},{3,TL_READ}}; /* Many reads */
1222
struct st_test test_6[] = {{0,TL_WRITE},{1,TL_WRITE},{2,TL_WRITE},{3,TL_WRITE}}; /* Many writes */
1223
struct st_test test_7[] = {{3,TL_READ}};
1224
struct st_test test_8[] = {{1,TL_READ_NO_INSERT},{2,TL_READ_NO_INSERT},{3,TL_READ_NO_INSERT}}; /* Should be quick */
1225
struct st_test test_9[] = {{4,TL_READ_HIGH_PRIORITY}};
1226
struct st_test test_10[] ={{4,TL_WRITE}};
1227
struct st_test test_11[] = {{0,TL_WRITE_LOW_PRIORITY},{1,TL_WRITE_LOW_PRIORITY},{2,TL_WRITE_LOW_PRIORITY},{3,TL_WRITE_LOW_PRIORITY}}; /* Many writes */
1228
struct st_test test_12[] = {{0,TL_WRITE_ALLOW_READ},{1,TL_WRITE_ALLOW_READ},{2,TL_WRITE_ALLOW_READ},{3,TL_WRITE_ALLOW_READ}}; /* Many writes */
1229
struct st_test test_13[] = {{0,TL_WRITE_CONCURRENT_INSERT},{1,TL_WRITE_CONCURRENT_INSERT},{2,TL_WRITE_CONCURRENT_INSERT},{3,TL_WRITE_CONCURRENT_INSERT}};
1230
struct st_test test_14[] = {{0,TL_WRITE_CONCURRENT_INSERT},{1,TL_READ}};
1231
struct st_test test_15[] = {{0,TL_WRITE_ALLOW_WRITE},{1,TL_READ}};
1232
struct st_test test_16[] = {{0,TL_WRITE_ALLOW_WRITE},{1,TL_WRITE_ALLOW_WRITE}};
1234
struct st_test *tests[] = {test_0,test_1,test_2,test_3,test_4,test_5,test_6,
1235
test_7,test_8,test_9,test_10,test_11,test_12,
1236
test_13,test_14,test_15,test_16};
1237
int lock_counts[]= {sizeof(test_0)/sizeof(struct st_test),
1238
sizeof(test_1)/sizeof(struct st_test),
1239
sizeof(test_2)/sizeof(struct st_test),
1240
sizeof(test_3)/sizeof(struct st_test),
1241
sizeof(test_4)/sizeof(struct st_test),
1242
sizeof(test_5)/sizeof(struct st_test),
1243
sizeof(test_6)/sizeof(struct st_test),
1244
sizeof(test_7)/sizeof(struct st_test),
1245
sizeof(test_8)/sizeof(struct st_test),
1246
sizeof(test_9)/sizeof(struct st_test),
1247
sizeof(test_10)/sizeof(struct st_test),
1248
sizeof(test_11)/sizeof(struct st_test),
1249
sizeof(test_12)/sizeof(struct st_test),
1250
sizeof(test_13)/sizeof(struct st_test),
1251
sizeof(test_14)/sizeof(struct st_test),
1252
sizeof(test_15)/sizeof(struct st_test),
1253
sizeof(test_16)/sizeof(struct st_test)
1257
static pthread_cond_t COND_thread_count;
1258
static pthread_mutex_t LOCK_thread_count;
1259
static uint thread_count;
1260
static uint32_t sum=0;
1262
#define MAX_LOCK_COUNT 8
1264
/* The following functions is for WRITE_CONCURRENT_INSERT */
1266
static void test_get_status(void* param __attribute__((unused)),
1267
int concurrent_insert __attribute__((unused)))
1271
static void test_update_status(void* param __attribute__((unused)))
1275
static void test_copy_status(void* to __attribute__((unused)) ,
1276
void *from __attribute__((unused)))
1280
static bool test_check_status(void* param __attribute__((unused)))
1286
static void *test_thread(void *arg)
1288
int i,j,param=*((int*) arg);
1289
THR_LOCK_DATA data[MAX_LOCK_COUNT];
1290
THR_LOCK_OWNER owner;
1291
THR_LOCK_INFO lock_info;
1292
THR_LOCK_DATA *multi_locks[MAX_LOCK_COUNT];
1295
printf("Thread %s (%d) started\n",my_thread_name(),param); fflush(stdout);
1298
thr_lock_info_init(&lock_info);
1299
thr_lock_owner_init(&owner, &lock_info);
1300
for (i=0; i < lock_counts[param] ; i++)
1301
thr_lock_data_init(locks+tests[param][i].lock_nr,data+i,NULL);
1302
for (j=1 ; j < 10 ; j++) /* try locking 10 times */
1304
for (i=0; i < lock_counts[param] ; i++)
1305
{ /* Init multi locks */
1306
multi_locks[i]= &data[i];
1307
data[i].type= tests[param][i].lock_type;
1309
thr_multi_lock(multi_locks, lock_counts[param], &owner);
1310
pthread_mutex_lock(&LOCK_thread_count);
1312
int tmp=rand() & 7; /* Do something from 0-2 sec */
1320
for (k=0 ; k < (uint32_t) (tmp-2)*100000L ; k++)
1324
pthread_mutex_unlock(&LOCK_thread_count);
1325
thr_multi_unlock(multi_locks,lock_counts[param]);
1328
printf("Thread %s (%d) ended\n",my_thread_name(),param); fflush(stdout);
1330
pthread_mutex_lock(&LOCK_thread_count);
1332
VOID(pthread_cond_signal(&COND_thread_count)); /* Tell main we are ready */
1333
pthread_mutex_unlock(&LOCK_thread_count);
1339
int main(int argc __attribute__((unused)),char **argv __attribute__((unused)))
1342
pthread_attr_t thr_attr;
1346
printf("Main thread: %s\n",my_thread_name());
1348
if ((error=pthread_cond_init(&COND_thread_count,NULL)))
1350
fprintf(stderr,"Got error: %d from pthread_cond_init (errno: %d)",
1354
if ((error=pthread_mutex_init(&LOCK_thread_count,MY_MUTEX_INIT_FAST)))
1356
fprintf(stderr,"Got error: %d from pthread_cond_init (errno: %d)",
1361
for (i=0 ; i < (int) array_elements(locks) ; i++)
1363
thr_lock_init(locks+i);
1364
locks[i].check_status= test_check_status;
1365
locks[i].update_status=test_update_status;
1366
locks[i].copy_status= test_copy_status;
1367
locks[i].get_status= test_get_status;
1369
if ((error=pthread_attr_init(&thr_attr)))
1371
fprintf(stderr,"Got error: %d from pthread_attr_init (errno: %d)",
1375
if ((error=pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED)))
1378
"Got error: %d from pthread_attr_setdetachstate (errno: %d)",
1382
#ifndef pthread_attr_setstacksize /* void return value */
1383
if ((error=pthread_attr_setstacksize(&thr_attr,65536L)))
1385
fprintf(stderr,"Got error: %d from pthread_attr_setstacksize (errno: %d)",
1390
#ifdef HAVE_THR_SETCONCURRENCY
1391
VOID(thr_setconcurrency(2));
1393
for (i=0 ; i < (int) array_elements(lock_counts) ; i++)
1395
param=(int*) malloc(sizeof(int));
1398
if ((error=pthread_mutex_lock(&LOCK_thread_count)))
1400
fprintf(stderr,"Got error: %d from pthread_mutex_lock (errno: %d)",
1404
if ((error=pthread_create(&tid,&thr_attr,test_thread,(void*) param)))
1406
fprintf(stderr,"Got error: %d from pthread_create (errno: %d)\n",
1408
pthread_mutex_unlock(&LOCK_thread_count);
1412
pthread_mutex_unlock(&LOCK_thread_count);
1415
pthread_attr_destroy(&thr_attr);
1416
if ((error=pthread_mutex_lock(&LOCK_thread_count)))
1417
fprintf(stderr,"Got error: %d from pthread_mutex_lock\n",error);
1418
while (thread_count)
1420
if ((error=pthread_cond_wait(&COND_thread_count,&LOCK_thread_count)))
1421
fprintf(stderr,"Got error: %d from pthread_cond_wait\n",error);
1423
if ((error=pthread_mutex_unlock(&LOCK_thread_count)))
1424
fprintf(stderr,"Got error: %d from pthread_mutex_unlock\n",error);
1425
for (i=0 ; i < (int) array_elements(locks) ; i++)
1426
thr_lock_delete(locks+i);
1429
printf("Got %d warnings\n",found_errors);
1432
printf("Test succeeded\n");