80
90
#include <drizzled/util/test.h>
82
#include <boost/interprocess/sync/lock_options.hpp>
92
bool thr_lock_inited=0;
93
uint32_t locks_immediate = 0L, locks_waited = 0L;
94
ulong table_lock_wait_timeout;
95
enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
97
/* The following constants are only for debug output */
98
#define MAX_THREADS 100
102
LIST *thr_lock_thread_list; /* List of threads in use */
103
ulong max_write_lock_count= ~(ulong) 0L;
105
static inline pthread_cond_t *get_cond(void)
89
uint64_t table_lock_wait_timeout;
90
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
93
uint64_t max_write_lock_count= UINT64_MAX;
95
static void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count);
107
return &my_thread_var->suspend;
98
111
** For the future (now the thread specific cond is alloced by my_pthread.c)
101
120
static inline bool
102
121
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
104
123
return rhs == lhs;
127
#define MAX_FOUND_ERRORS 10 /* Report 10 first errors */
128
static uint32_t found_errors=0;
130
static int check_lock(struct st_lock_list *list, const char* lock_type,
131
const char *where, bool same_owner, bool no_cond)
133
THR_LOCK_DATA *data,**prev;
135
THR_LOCK_OWNER *first_owner;
140
enum thr_lock_type last_lock_type=list->data->type;
142
if (same_owner && list->data)
143
first_owner= list->data->owner;
144
for (data=list->data; data && count++ < MAX_LOCKS ; data=data->next)
146
if (data->type != last_lock_type)
147
last_lock_type=TL_IGNORE;
148
if (data->prev != prev)
151
"Warning: prev link %d didn't point at previous lock at %s: %s\n",
152
count, lock_type, where);
156
!thr_lock_owner_equal(data->owner, first_owner) &&
157
last_lock_type != TL_WRITE_ALLOW_WRITE)
160
"Warning: Found locks from different threads in %s: %s\n",
164
if (no_cond && data->cond)
167
"Warning: Found active lock with not reset cond %s: %s\n",
175
fprintf(stderr,"Warning: found too many locks at %s: %s\n",
180
if (prev != list->last)
182
fprintf(stderr,"Warning: last didn't point at last lock at %s: %s\n",
190
static void check_locks(THR_LOCK *lock, const char *where,
193
uint32_t old_found_errors=found_errors;
195
if (found_errors < MAX_FOUND_ERRORS)
197
if (check_lock(&lock->write,"write",where,1,1) |
198
check_lock(&lock->write_wait,"write_wait",where,0,0) |
199
check_lock(&lock->read,"read",where,0,1) |
200
check_lock(&lock->read_wait,"read_wait",where,0,0))
203
if (found_errors < MAX_FOUND_ERRORS)
207
for (data=lock->read.data ; data ; data=data->next)
209
if ((int) data->type == (int) TL_READ_NO_INSERT)
211
/* Protect against infinite loop. */
212
assert(count <= lock->read_no_write_count);
214
if (count != lock->read_no_write_count)
218
"Warning at '%s': Locks read_no_write_count was %u when it should have been %u\n", where, lock->read_no_write_count,count);
221
if (!lock->write.data)
223
if (!allow_no_locks && !lock->read.data &&
224
(lock->write_wait.data || lock->read_wait.data))
228
"Warning at '%s': No locks in use but locks are in wait queue\n",
231
if (!lock->write_wait.data)
233
if (!allow_no_locks && lock->read_wait.data)
237
"Warning at '%s': No write locks and waiting read locks\n",
243
if (!allow_no_locks &&
244
(((lock->write_wait.data->type == TL_WRITE_CONCURRENT_INSERT ||
245
lock->write_wait.data->type == TL_WRITE_ALLOW_WRITE) &&
246
!lock->read_no_write_count) ||
247
lock->write_wait.data->type == TL_WRITE_ALLOW_READ ||
248
(lock->write_wait.data->type == TL_WRITE_DELAYED &&
253
"Warning at '%s': Write lock %d waiting while no exclusive read locks\n",where,(int) lock->write_wait.data->type);
258
{ /* Have write lock */
259
if (lock->write_wait.data)
261
if (!allow_no_locks &&
262
lock->write.data->type == TL_WRITE_ALLOW_WRITE &&
263
lock->write_wait.data->type == TL_WRITE_ALLOW_WRITE)
267
"Warning at '%s': Found WRITE_ALLOW_WRITE lock waiting for WRITE_ALLOW_WRITE lock\n",
273
if (!thr_lock_owner_equal(lock->write.data->owner,
274
lock->read.data->owner) &&
275
((lock->write.data->type > TL_WRITE_DELAYED &&
276
lock->write.data->type != TL_WRITE_ONLY) ||
277
((lock->write.data->type == TL_WRITE_CONCURRENT_INSERT ||
278
lock->write.data->type == TL_WRITE_ALLOW_WRITE) &&
279
lock->read_no_write_count)))
283
"Warning at '%s': Found lock of type %d that is write and read locked\n",
284
where, lock->write.data->type);
287
if (lock->read_wait.data)
289
if (!allow_no_locks && lock->write.data->type <= TL_WRITE_DELAYED &&
290
lock->read_wait.data->type <= TL_READ_HIGH_PRIORITY)
294
"Warning at '%s': Found read lock of type %d waiting for write lock of type %d\n",
296
(int) lock->read_wait.data->type,
297
(int) lock->write.data->type);
306
#else /* EXTRA_DEBUG */
307
#define check_locks(A,B,C)
108
311
/* Initialize a lock */
110
313
void thr_lock_init(THR_LOCK *lock)
315
memset(lock, 0, sizeof(*lock));
316
pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
113
317
lock->read.last= &lock->read.data;
114
318
lock->read_wait.last= &lock->read_wait.data;
115
319
lock->write_wait.last= &lock->write_wait.data;
116
320
lock->write.last= &lock->write.data;
120
void THR_LOCK_INFO::init()
122
internal::st_my_thread_var *tmp= my_thread_var;
123
thread= tmp->pthread_self;
322
pthread_mutex_lock(&THR_LOCK_lock); /* Add to locks in use */
323
lock->list.data=(void*) lock;
324
thr_lock_thread_list=list_add(thr_lock_thread_list,&lock->list);
325
pthread_mutex_unlock(&THR_LOCK_lock);
330
void thr_lock_delete(THR_LOCK *lock)
332
pthread_mutex_destroy(&lock->mutex);
333
pthread_mutex_lock(&THR_LOCK_lock);
334
thr_lock_thread_list=list_delete(thr_lock_thread_list,&lock->list);
335
pthread_mutex_unlock(&THR_LOCK_lock);
340
void thr_lock_info_init(THR_LOCK_INFO *info)
342
struct st_my_thread_var *tmp= my_thread_var;
343
info->thread= tmp->pthread_self;
344
info->thread_id= tmp->id;
128
348
/* Initialize a lock instance */
130
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
350
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
134
owner= NULL; /* no owner yet */
135
status_param= param_arg;
353
data->type=TL_UNLOCK;
354
data->owner= 0; /* no owner yet */
355
data->status_param=param;
691
1042
This is used to abort all locks for a specific thread
694
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
1045
bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread_id)
1047
THR_LOCK_DATA *data;
696
1048
bool found= false;
698
boost_unique_lock_t scopedLock(mutex);
699
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
1050
pthread_mutex_lock(&lock->mutex);
1051
for (data= lock->read_wait.data; data ; data= data->next)
701
if (local_data->owner->info->thread_id == thread_id_arg)
1053
if (data->owner->info->thread_id == thread_id) /* purecov: tested */
703
local_data->type= TL_UNLOCK; /* Mark killed */
1055
data->type= TL_UNLOCK; /* Mark killed */
704
1056
/* It's safe to signal the cond first: we're still holding the mutex. */
706
local_data->cond->notify_one();
707
local_data->cond= 0; /* Removed from list */
1058
pthread_cond_signal(data->cond);
1059
data->cond= 0; /* Removed from list */
709
if (((*local_data->prev)= local_data->next))
710
local_data->next->prev= local_data->prev;
1061
if (((*data->prev)= data->next))
1062
data->next->prev= data->prev;
712
read_wait.last= local_data->prev;
1064
lock->read_wait.last= data->prev;
715
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
1067
for (data= lock->write_wait.data; data ; data= data->next)
717
if (local_data->owner->info->thread_id == thread_id_arg)
1069
if (data->owner->info->thread_id == thread_id) /* purecov: tested */
719
local_data->type= TL_UNLOCK;
1071
data->type= TL_UNLOCK;
721
local_data->cond->notify_one();
722
local_data->cond= NULL;
724
if (((*local_data->prev)= local_data->next))
725
local_data->next->prev= local_data->prev;
727
write_wait.last= local_data->prev;
730
wake_up_waiters(this);
735
} /* namespace drizzled */
1073
pthread_cond_signal(data->cond);
1076
if (((*data->prev)= data->next))
1077
data->next->prev= data->prev;
1079
lock->write_wait.last= data->prev;
1082
wake_up_waiters(lock);
1083
pthread_mutex_unlock(&lock->mutex);
1089
Downgrade a WRITE_* to a lower WRITE level
1091
thr_downgrade_write_lock()
1092
in_data Lock data of thread downgrading its lock
1093
new_lock_type New write lock type
1097
This can be used to downgrade a lock already owned. When the downgrade
1098
occurs also other waiters, both readers and writers can be allowed to
1100
The previous lock is often TL_WRITE_ONLY but can also be
1101
TL_WRITE and TL_WRITE_ALLOW_READ. The normal downgrade variants are
1102
TL_WRITE_ONLY => TL_WRITE_ALLOW_READ After a short exclusive lock
1103
TL_WRITE_ALLOW_READ => TL_WRITE_ALLOW_WRITE After discovering that the
1104
operation didn't need such a high lock.
1105
TL_WRITE_ONLY => TL_WRITE after a short exclusive lock while holding a
1107
TL_WRITE_ONLY => TL_WRITE_ALLOW_WRITE After a short exclusive lock after
1108
already earlier having dongraded lock to TL_WRITE_ALLOW_WRITE
1109
The implementation is conservative and rather don't start rather than
1110
go on unknown paths to start, the common cases are handled.
1113
In its current implementation it is only allowed to downgrade from
1114
TL_WRITE_ONLY. In this case there are no waiters. Thus no wake up
1118
void thr_downgrade_write_lock(THR_LOCK_DATA *in_data,
1119
enum thr_lock_type new_lock_type)
1121
THR_LOCK *lock=in_data->lock;
1123
pthread_mutex_lock(&lock->mutex);
1124
in_data->type= new_lock_type;
1125
check_locks(lock,"after downgrading lock",0);
1127
pthread_mutex_unlock(&lock->mutex);
1131
/* Upgrade a WRITE_DELAY lock to a WRITE_LOCK */
1133
bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data)
1135
THR_LOCK *lock=data->lock;
1137
pthread_mutex_lock(&lock->mutex);
1138
if (data->type == TL_UNLOCK || data->type >= TL_WRITE_LOW_PRIORITY)
1140
pthread_mutex_unlock(&lock->mutex);
1141
return(data->type == TL_UNLOCK); /* Test if Aborted */
1143
check_locks(lock,"before upgrading lock",0);
1144
/* TODO: Upgrade to TL_WRITE_CONCURRENT_INSERT in some cases */
1145
data->type=TL_WRITE; /* Upgrade lock */
1147
/* Check if someone has given us the lock */
1150
if (!lock->read.data) /* No read locks */
1151
{ /* We have the lock */
1152
if (data->lock->get_status)
1153
(*data->lock->get_status)(data->status_param, 0);
1154
pthread_mutex_unlock(&lock->mutex);
1158
if (((*data->prev)=data->next)) /* remove from lock-list */
1159
data->next->prev= data->prev;
1161
lock->write.last=data->prev;
1163
if ((data->next=lock->write_wait.data)) /* Put first in lock_list */
1164
data->next->prev= &data->next;
1166
lock->write_wait.last= &data->next;
1167
data->prev= &lock->write_wait.data;
1168
lock->write_wait.data=data;
1169
check_locks(lock,"upgrading lock",0);
1173
check_locks(lock,"waiting for lock",0);
1175
return(wait_for_lock(&lock->write_wait,data,1));
1179
/* downgrade a WRITE lock to a WRITE_DELAY lock if there is pending locks */
1181
bool thr_reschedule_write_lock(THR_LOCK_DATA *data)
1183
THR_LOCK *lock=data->lock;
1185
pthread_mutex_lock(&lock->mutex);
1186
if (!lock->read_wait.data) /* No waiting read locks */
1188
pthread_mutex_unlock(&lock->mutex);
1192
data->type=TL_WRITE_DELAYED;
1193
if (lock->update_status)
1194
(*lock->update_status)(data->status_param);
1195
if (((*data->prev)=data->next)) /* remove from lock-list */
1196
data->next->prev= data->prev;
1198
lock->write.last=data->prev;
1200
if ((data->next=lock->write_wait.data)) /* Put first in lock_list */
1201
data->next->prev= &data->next;
1203
lock->write_wait.last= &data->next;
1204
data->prev= &lock->write_wait.data;
1205
data->cond=get_cond(); /* This was zero */
1206
lock->write_wait.data=data;
1207
free_all_read_locks(lock,0);
1209
pthread_mutex_unlock(&lock->mutex);
1210
return(thr_upgrade_write_delay_lock(data));
1216
/*****************************************************************************
1217
** Test of thread locks
1218
****************************************************************************/
1224
enum thr_lock_type lock_type;
1227
THR_LOCK locks[5]; /* 4 locks */
1229
struct st_test test_0[] = {{0,TL_READ}}; /* One lock */
1230
struct st_test test_1[] = {{0,TL_READ},{0,TL_WRITE}}; /* Read and write lock of lock 0 */
1231
struct st_test test_2[] = {{1,TL_WRITE},{0,TL_READ},{2,TL_READ}};
1232
struct st_test test_3[] = {{2,TL_WRITE},{1,TL_READ},{0,TL_READ}}; /* Deadlock with test_2 ? */
1233
struct st_test test_4[] = {{0,TL_WRITE},{0,TL_READ},{0,TL_WRITE},{0,TL_READ}};
1234
struct st_test test_5[] = {{0,TL_READ},{1,TL_READ},{2,TL_READ},{3,TL_READ}}; /* Many reads */
1235
struct st_test test_6[] = {{0,TL_WRITE},{1,TL_WRITE},{2,TL_WRITE},{3,TL_WRITE}}; /* Many writes */
1236
struct st_test test_7[] = {{3,TL_READ}};
1237
struct st_test test_8[] = {{1,TL_READ_NO_INSERT},{2,TL_READ_NO_INSERT},{3,TL_READ_NO_INSERT}}; /* Should be quick */
1238
struct st_test test_9[] = {{4,TL_READ_HIGH_PRIORITY}};
1239
struct st_test test_10[] ={{4,TL_WRITE}};
1240
struct st_test test_11[] = {{0,TL_WRITE_LOW_PRIORITY},{1,TL_WRITE_LOW_PRIORITY},{2,TL_WRITE_LOW_PRIORITY},{3,TL_WRITE_LOW_PRIORITY}}; /* Many writes */
1241
struct st_test test_12[] = {{0,TL_WRITE_ALLOW_READ},{1,TL_WRITE_ALLOW_READ},{2,TL_WRITE_ALLOW_READ},{3,TL_WRITE_ALLOW_READ}}; /* Many writes */
1242
struct st_test test_13[] = {{0,TL_WRITE_CONCURRENT_INSERT},{1,TL_WRITE_CONCURRENT_INSERT},{2,TL_WRITE_CONCURRENT_INSERT},{3,TL_WRITE_CONCURRENT_INSERT}};
1243
struct st_test test_14[] = {{0,TL_WRITE_CONCURRENT_INSERT},{1,TL_READ}};
1244
struct st_test test_15[] = {{0,TL_WRITE_ALLOW_WRITE},{1,TL_READ}};
1245
struct st_test test_16[] = {{0,TL_WRITE_ALLOW_WRITE},{1,TL_WRITE_ALLOW_WRITE}};
1247
struct st_test *tests[] = {test_0,test_1,test_2,test_3,test_4,test_5,test_6,
1248
test_7,test_8,test_9,test_10,test_11,test_12,
1249
test_13,test_14,test_15,test_16};
1250
int lock_counts[]= {sizeof(test_0)/sizeof(struct st_test),
1251
sizeof(test_1)/sizeof(struct st_test),
1252
sizeof(test_2)/sizeof(struct st_test),
1253
sizeof(test_3)/sizeof(struct st_test),
1254
sizeof(test_4)/sizeof(struct st_test),
1255
sizeof(test_5)/sizeof(struct st_test),
1256
sizeof(test_6)/sizeof(struct st_test),
1257
sizeof(test_7)/sizeof(struct st_test),
1258
sizeof(test_8)/sizeof(struct st_test),
1259
sizeof(test_9)/sizeof(struct st_test),
1260
sizeof(test_10)/sizeof(struct st_test),
1261
sizeof(test_11)/sizeof(struct st_test),
1262
sizeof(test_12)/sizeof(struct st_test),
1263
sizeof(test_13)/sizeof(struct st_test),
1264
sizeof(test_14)/sizeof(struct st_test),
1265
sizeof(test_15)/sizeof(struct st_test),
1266
sizeof(test_16)/sizeof(struct st_test)
1270
static pthread_cond_t COND_thread_count;
1271
static pthread_mutex_t LOCK_thread_count;
1272
static uint32_t thread_count;
1273
static uint32_t sum=0;
1275
#define MAX_LOCK_COUNT 8
1277
/* The following functions is for WRITE_CONCURRENT_INSERT */
1279
static void test_get_status(void* param __attribute__((unused)),
1280
int concurrent_insert __attribute__((unused)))
1284
static void test_update_status(void* param __attribute__((unused)))
1288
static void test_copy_status(void* to __attribute__((unused)) ,
1289
void *from __attribute__((unused)))
1293
static bool test_check_status(void* param __attribute__((unused)))
1299
static void *test_thread(void *arg)
1301
int i,j,param=*((int*) arg);
1302
THR_LOCK_DATA data[MAX_LOCK_COUNT];
1303
THR_LOCK_OWNER owner;
1304
THR_LOCK_INFO lock_info;
1305
THR_LOCK_DATA *multi_locks[MAX_LOCK_COUNT];
1308
printf("Thread %s (%d) started\n",my_thread_name(),param); fflush(stdout);
1311
thr_lock_info_init(&lock_info);
1312
thr_lock_owner_init(&owner, &lock_info);
1313
for (i=0; i < lock_counts[param] ; i++)
1314
thr_lock_data_init(locks+tests[param][i].lock_nr,data+i,NULL);
1315
for (j=1 ; j < 10 ; j++) /* try locking 10 times */
1317
for (i=0; i < lock_counts[param] ; i++)
1318
{ /* Init multi locks */
1319
multi_locks[i]= &data[i];
1320
data[i].type= tests[param][i].lock_type;
1322
thr_multi_lock(multi_locks, lock_counts[param], &owner);
1323
pthread_mutex_lock(&LOCK_thread_count);
1325
int tmp=rand() & 7; /* Do something from 0-2 sec */
1333
for (k=0 ; k < (uint32_t) (tmp-2)*100000L ; k++)
1337
pthread_mutex_unlock(&LOCK_thread_count);
1338
thr_multi_unlock(multi_locks,lock_counts[param]);
1341
printf("Thread %s (%d) ended\n",my_thread_name(),param); fflush(stdout);
1343
pthread_mutex_lock(&LOCK_thread_count);
1345
pthread_cond_signal(&COND_thread_count); /* Tell main we are ready */
1346
pthread_mutex_unlock(&LOCK_thread_count);
1347
free((unsigned char*) arg);
1352
int main(int argc __attribute__((unused)),char **argv __attribute__((unused)))
1355
pthread_attr_t thr_attr;
1359
printf("Main thread: %s\n",my_thread_name());
1361
if ((error=pthread_cond_init(&COND_thread_count,NULL)))
1363
fprintf(stderr,"Got error: %d from pthread_cond_init (errno: %d)",
1367
if ((error=pthread_mutex_init(&LOCK_thread_count,MY_MUTEX_INIT_FAST)))
1369
fprintf(stderr,"Got error: %d from pthread_cond_init (errno: %d)",
1374
for (i=0 ; i < (int) array_elements(locks) ; i++)
1376
thr_lock_init(locks+i);
1377
locks[i].check_status= test_check_status;
1378
locks[i].update_status=test_update_status;
1379
locks[i].copy_status= test_copy_status;
1380
locks[i].get_status= test_get_status;
1382
if ((error=pthread_attr_init(&thr_attr)))
1384
fprintf(stderr,"Got error: %d from pthread_attr_init (errno: %d)",
1388
if ((error=pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED)))
1391
"Got error: %d from pthread_attr_setdetachstate (errno: %d)",
1395
#ifndef pthread_attr_setstacksize /* void return value */
1396
if ((error=pthread_attr_setstacksize(&thr_attr,65536L)))
1398
fprintf(stderr,"Got error: %d from pthread_attr_setstacksize (errno: %d)",
1403
#ifdef HAVE_THR_SETCONCURRENCY
1404
thr_setconcurrency(2);
1406
for (i=0 ; i < (int) array_elements(lock_counts) ; i++)
1408
param=(int*) malloc(sizeof(int));
1411
if ((error=pthread_mutex_lock(&LOCK_thread_count)))
1413
fprintf(stderr,"Got error: %d from pthread_mutex_lock (errno: %d)",
1417
if ((error=pthread_create(&tid,&thr_attr,test_thread,(void*) param)))
1419
fprintf(stderr,"Got error: %d from pthread_create (errno: %d)\n",
1421
pthread_mutex_unlock(&LOCK_thread_count);
1425
pthread_mutex_unlock(&LOCK_thread_count);
1428
pthread_attr_destroy(&thr_attr);
1429
if ((error=pthread_mutex_lock(&LOCK_thread_count)))
1430
fprintf(stderr,"Got error: %d from pthread_mutex_lock\n",error);
1431
while (thread_count)
1433
if ((error=pthread_cond_wait(&COND_thread_count,&LOCK_thread_count)))
1434
fprintf(stderr,"Got error: %d from pthread_cond_wait\n",error);
1436
if ((error=pthread_mutex_unlock(&LOCK_thread_count)))
1437
fprintf(stderr,"Got error: %d from pthread_mutex_unlock\n",error);
1438
for (i=0 ; i < (int) array_elements(locks) ; i++)
1439
thr_lock_delete(locks+i);
1442
printf("Got %d warnings\n",found_errors);
1445
printf("Test succeeded\n");