~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/thr_lock.c

  • Committer: Monty Taylor
  • Date: 2008-08-01 22:33:44 UTC
  • mto: (236.1.42 codestyle)
  • mto: This revision was merged to the branch mainline in revision 261.
  • Revision ID: monty@inaugust.com-20080801223344-vzhlflfmtijp1imv
First pass at gettexizing the error messages.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
Read and write locks for Posix threads. All tread must acquire
26
26
 
27
27
TL_READ                 # Low priority read
28
28
TL_READ_WITH_SHARED_LOCKS
 
29
TL_READ_HIGH_PRIORITY   # High priority read
29
30
TL_READ_NO_INSERT       # Read without concurrent inserts
30
31
TL_WRITE_ALLOW_WRITE    # Write lock that allows other writers
31
32
TL_WRITE_ALLOW_READ     # Write lock, but allow reading
32
33
TL_WRITE_CONCURRENT_INSERT
33
34
                        # Insert that can be mixed when selects
 
35
TL_WRITE_DELAYED        # Used by delayed insert
 
36
                        # Allows lower locks to take over
 
37
TL_WRITE_LOW_PRIORITY   # Low priority write
34
38
TL_WRITE                # High priority write
35
39
TL_WRITE_ONLY           # High priority write
36
40
                        # Abort all new lock request with an error
46
50
should put a pointer to the following functions in the lock structure:
47
51
(If the pointer is zero (default), the function is not called)
48
52
 
 
53
check_status:
 
54
         Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
 
55
         we check if this function exists and returns 0.
 
56
         If not, then the lock is upgraded to TL_WRITE_LOCK
 
57
         In MyISAM this is a simple check if the insert can be done
 
58
         at the end of the datafile.
 
59
update_status:
 
60
        Before a write lock is released, this function is called.
 
61
        In MyISAM this functions updates the count and length of the datafile
 
62
get_status:
 
63
        When one gets a lock this functions is called.
 
64
        In MyISAM this stores the number of rows and size of the datafile
 
65
        for concurrent reads.
49
66
 
50
67
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
51
 
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
 
68
TL_WRITE_CONCURRENT_INSERT or one TL_WRITE_DELAYED lock at the same time as
 
69
multiple read locks.
52
70
 
53
71
*/
54
72
 
55
 
#include <config.h>
56
 
#include <drizzled/internal/my_sys.h>
57
 
#include <drizzled/internal/thread_var.h>
58
 
#include <drizzled/statistics_variables.h>
59
 
#include <drizzled/pthread_globals.h>
60
 
 
61
 
#include <drizzled/session.h>
 
73
#include "mysys_priv.h"
62
74
 
63
75
#include "thr_lock.h"
64
 
#include <drizzled/internal/m_string.h>
 
76
#include <mystrings/m_string.h>
65
77
#include <errno.h>
66
 
#include <list>
67
 
 
68
 
#if TIME_WITH_SYS_TIME
69
 
# include <sys/time.h>
70
 
# include <time.h>
71
 
#else
72
 
# if HAVE_SYS_TIME_H
73
 
#  include <sys/time.h>
74
 
# else
75
 
#  include <time.h>
76
 
# endif
77
 
#endif
78
 
 
79
 
#include <drizzled/util/test.h>
80
 
 
81
 
#include <boost/interprocess/sync/lock_options.hpp>
82
 
 
83
 
using namespace std;
84
 
 
85
 
namespace drizzled
 
78
 
 
79
bool thr_lock_inited=0;
 
80
ulong locks_immediate = 0L, locks_waited = 0L;
 
81
ulong table_lock_wait_timeout;
 
82
enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
 
83
 
 
84
/* The following constants are only for debug output */
 
85
#define MAX_THREADS 100
 
86
#define MAX_LOCKS   100
 
87
 
 
88
 
 
89
LIST *thr_lock_thread_list;                     /* List of threads in use */
 
90
ulong max_write_lock_count= ~(ulong) 0L;
 
91
 
 
92
static inline pthread_cond_t *get_cond(void)
86
93
{
87
 
 
88
 
uint64_t table_lock_wait_timeout;
89
 
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
90
 
 
91
 
 
92
 
uint64_t max_write_lock_count= UINT64_MAX;
 
94
  return &my_thread_var->suspend;
 
95
}
93
96
 
94
97
/*
95
98
** For the future (now the thread specific cond is alloced by my_pthread.c)
96
99
*/
97
100
 
 
101
bool init_thr_lock()
 
102
{
 
103
  thr_lock_inited=1;
 
104
  return 0;
 
105
}
 
106
 
98
107
static inline bool
99
108
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
100
109
{
101
110
  return rhs == lhs;
102
111
}
103
112
 
 
113
#ifdef EXTRA_DEBUG
 
114
#define MAX_FOUND_ERRORS        10              /* Report 10 first errors */
 
115
static uint found_errors=0;
 
116
 
 
117
static int check_lock(struct st_lock_list *list, const char* lock_type,
 
118
                      const char *where, bool same_owner, bool no_cond)
 
119
{
 
120
  THR_LOCK_DATA *data,**prev;
 
121
  uint count=0;
 
122
  THR_LOCK_OWNER *first_owner;
 
123
 
 
124
  prev= &list->data;
 
125
  if (list->data)
 
126
  {
 
127
    enum thr_lock_type last_lock_type=list->data->type;
 
128
 
 
129
    if (same_owner && list->data)
 
130
      first_owner= list->data->owner;
 
131
    for (data=list->data; data && count++ < MAX_LOCKS ; data=data->next)
 
132
    {
 
133
      if (data->type != last_lock_type)
 
134
        last_lock_type=TL_IGNORE;
 
135
      if (data->prev != prev)
 
136
      {
 
137
        fprintf(stderr,
 
138
                "Warning: prev link %d didn't point at previous lock at %s: %s\n",
 
139
                count, lock_type, where);
 
140
        return 1;
 
141
      }
 
142
      if (same_owner &&
 
143
          !thr_lock_owner_equal(data->owner, first_owner) &&
 
144
          last_lock_type != TL_WRITE_ALLOW_WRITE)
 
145
      {
 
146
        fprintf(stderr,
 
147
                "Warning: Found locks from different threads in %s: %s\n",
 
148
                lock_type,where);
 
149
        return 1;
 
150
      }
 
151
      if (no_cond && data->cond)
 
152
      {
 
153
        fprintf(stderr,
 
154
                "Warning: Found active lock with not reset cond %s: %s\n",
 
155
                lock_type,where);
 
156
        return 1;
 
157
      }
 
158
      prev= &data->next;
 
159
    }
 
160
    if (data)
 
161
    {
 
162
      fprintf(stderr,"Warning: found too many locks at %s: %s\n",
 
163
              lock_type,where);
 
164
      return 1;
 
165
    }
 
166
  }
 
167
  if (prev != list->last)
 
168
  {
 
169
    fprintf(stderr,"Warning: last didn't point at last lock at %s: %s\n",
 
170
            lock_type, where);
 
171
    return 1;
 
172
  }
 
173
  return 0;
 
174
}
 
175
 
 
176
 
 
177
static void check_locks(THR_LOCK *lock, const char *where,
 
178
                        bool allow_no_locks)
 
179
{
 
180
  uint old_found_errors=found_errors;
 
181
 
 
182
  if (found_errors < MAX_FOUND_ERRORS)
 
183
  {
 
184
    if (check_lock(&lock->write,"write",where,1,1) |
 
185
        check_lock(&lock->write_wait,"write_wait",where,0,0) |
 
186
        check_lock(&lock->read,"read",where,0,1) |
 
187
        check_lock(&lock->read_wait,"read_wait",where,0,0))
 
188
      found_errors++;
 
189
 
 
190
    if (found_errors < MAX_FOUND_ERRORS)
 
191
    {
 
192
      uint count=0;
 
193
      THR_LOCK_DATA *data;
 
194
      for (data=lock->read.data ; data ; data=data->next)
 
195
      {
 
196
        if ((int) data->type == (int) TL_READ_NO_INSERT)
 
197
          count++;
 
198
        /* Protect against infinite loop. */
 
199
        assert(count <= lock->read_no_write_count);
 
200
      }
 
201
      if (count != lock->read_no_write_count)
 
202
      {
 
203
        found_errors++;
 
204
        fprintf(stderr,
 
205
                "Warning at '%s': Locks read_no_write_count was %u when it should have been %u\n", where, lock->read_no_write_count,count);
 
206
      }      
 
207
 
 
208
      if (!lock->write.data)
 
209
      {
 
210
        if (!allow_no_locks && !lock->read.data &&
 
211
            (lock->write_wait.data || lock->read_wait.data))
 
212
        {
 
213
          found_errors++;
 
214
          fprintf(stderr,
 
215
                  "Warning at '%s': No locks in use but locks are in wait queue\n",
 
216
                  where);
 
217
        }
 
218
        if (!lock->write_wait.data)
 
219
        {
 
220
          if (!allow_no_locks && lock->read_wait.data)
 
221
          {
 
222
            found_errors++;
 
223
            fprintf(stderr,
 
224
                    "Warning at '%s': No write locks and waiting read locks\n",
 
225
                    where);
 
226
          }
 
227
        }
 
228
        else
 
229
        {
 
230
          if (!allow_no_locks &&
 
231
              (((lock->write_wait.data->type == TL_WRITE_CONCURRENT_INSERT ||
 
232
                 lock->write_wait.data->type == TL_WRITE_ALLOW_WRITE) &&
 
233
                !lock->read_no_write_count) ||
 
234
               lock->write_wait.data->type == TL_WRITE_ALLOW_READ ||
 
235
               (lock->write_wait.data->type == TL_WRITE_DELAYED &&
 
236
                !lock->read.data)))
 
237
          {
 
238
            found_errors++;
 
239
            fprintf(stderr,
 
240
                    "Warning at '%s': Write lock %d waiting while no exclusive read locks\n",where,(int) lock->write_wait.data->type);
 
241
          }
 
242
        }             
 
243
      }
 
244
      else
 
245
      {                                         /* Have write lock */
 
246
        if (lock->write_wait.data)
 
247
        {
 
248
          if (!allow_no_locks && 
 
249
              lock->write.data->type == TL_WRITE_ALLOW_WRITE &&
 
250
              lock->write_wait.data->type == TL_WRITE_ALLOW_WRITE)
 
251
          {
 
252
            found_errors++;
 
253
            fprintf(stderr,
 
254
                    "Warning at '%s': Found WRITE_ALLOW_WRITE lock waiting for WRITE_ALLOW_WRITE lock\n",
 
255
                    where);
 
256
          }
 
257
        }
 
258
        if (lock->read.data)
 
259
        {
 
260
          if (!thr_lock_owner_equal(lock->write.data->owner,
 
261
                                    lock->read.data->owner) &&
 
262
              ((lock->write.data->type > TL_WRITE_DELAYED &&
 
263
                lock->write.data->type != TL_WRITE_ONLY) ||
 
264
               ((lock->write.data->type == TL_WRITE_CONCURRENT_INSERT ||
 
265
                 lock->write.data->type == TL_WRITE_ALLOW_WRITE) &&
 
266
                lock->read_no_write_count)))
 
267
          {
 
268
            found_errors++;
 
269
            fprintf(stderr,
 
270
                    "Warning at '%s': Found lock of type %d that is write and read locked\n",
 
271
                    where, lock->write.data->type);
 
272
          }
 
273
        }
 
274
        if (lock->read_wait.data)
 
275
        {
 
276
          if (!allow_no_locks && lock->write.data->type <= TL_WRITE_DELAYED &&
 
277
              lock->read_wait.data->type <= TL_READ_HIGH_PRIORITY)
 
278
          {
 
279
            found_errors++;
 
280
            fprintf(stderr,
 
281
                    "Warning at '%s': Found read lock of type %d waiting for write lock of type %d\n",
 
282
                    where,
 
283
                    (int) lock->read_wait.data->type,
 
284
                    (int) lock->write.data->type);
 
285
          }
 
286
        }
 
287
      }
 
288
    }
 
289
  }
 
290
  return;
 
291
}
 
292
 
 
293
#else /* EXTRA_DEBUG */
 
294
#define check_locks(A,B,C)
 
295
#endif
 
296
 
104
297
 
105
298
        /* Initialize a lock */
106
299
 
107
300
void thr_lock_init(THR_LOCK *lock)
108
301
{
109
 
  lock->init();
 
302
  memset((char*) lock, 0, sizeof(*lock));
 
303
  VOID(pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST));
110
304
  lock->read.last= &lock->read.data;
111
305
  lock->read_wait.last= &lock->read_wait.data;
112
306
  lock->write_wait.last= &lock->write_wait.data;
113
307
  lock->write.last= &lock->write.data;
114
 
}
115
 
 
116
 
 
117
 
void THR_LOCK_INFO::init()
118
 
{
119
 
  internal::st_my_thread_var *tmp= my_thread_var;
120
 
  thread_id= tmp->id;
121
 
  n_cursors= 0;
 
308
 
 
309
  pthread_mutex_lock(&THR_LOCK_lock);           /* Add to locks in use */
 
310
  lock->list.data=(void*) lock;
 
311
  thr_lock_thread_list=list_add(thr_lock_thread_list,&lock->list);
 
312
  pthread_mutex_unlock(&THR_LOCK_lock);
 
313
  return;
 
314
}
 
315
 
 
316
 
 
317
void thr_lock_delete(THR_LOCK *lock)
 
318
{
 
319
  VOID(pthread_mutex_destroy(&lock->mutex));
 
320
  pthread_mutex_lock(&THR_LOCK_lock);
 
321
  thr_lock_thread_list=list_delete(thr_lock_thread_list,&lock->list);
 
322
  pthread_mutex_unlock(&THR_LOCK_lock);
 
323
  return;
 
324
}
 
325
 
 
326
 
 
327
void thr_lock_info_init(THR_LOCK_INFO *info)
 
328
{
 
329
  struct st_my_thread_var *tmp= my_thread_var;
 
330
  info->thread=    tmp->pthread_self;
 
331
  info->thread_id= tmp->id;
 
332
  info->n_cursors= 0;
122
333
}
123
334
 
124
335
        /* Initialize a lock instance */
125
336
 
126
 
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
 
337
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
127
338
{
128
 
  lock= lock_arg;
129
 
  type= TL_UNLOCK;
130
 
  owner= NULL;                               /* no owner yet */
131
 
  status_param= param_arg;
132
 
  cond= NULL;
 
339
  data->lock=lock;
 
340
  data->type=TL_UNLOCK;
 
341
  data->owner= 0;                               /* no owner yet */
 
342
  data->status_param=param;
 
343
  data->cond=0;
133
344
}
134
345
 
135
346
 
139
350
  for ( ; data ; data=data->next)
140
351
  {
141
352
    if (thr_lock_owner_equal(data->owner, owner))
142
 
      return true;                                      /* Already locked by thread */
143
 
  }
144
 
  return false;
145
 
}
 
353
      return 1;                                 /* Already locked by thread */
 
354
  }
 
355
  return 0;
 
356
}
 
357
 
 
358
static inline bool have_specific_lock(THR_LOCK_DATA *data,
 
359
                                         enum thr_lock_type type)
 
360
{
 
361
  for ( ; data ; data=data->next)
 
362
  {
 
363
    if (data->type == type)
 
364
      return 1;
 
365
  }
 
366
  return 0;
 
367
}
 
368
 
146
369
 
147
370
static void wake_up_waiters(THR_LOCK *lock);
148
371
 
149
372
 
150
 
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
 
373
static enum enum_thr_lock_result
 
374
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
 
375
              bool in_wait_list)
151
376
{
152
 
  internal::st_my_thread_var *thread_var= session.getThreadVar();
153
 
 
154
 
  boost::condition_variable_any *cond= &thread_var->suspend;
 
377
  struct st_my_thread_var *thread_var= my_thread_var;
 
378
  pthread_cond_t *cond= &thread_var->suspend;
 
379
  struct timespec wait_timeout;
155
380
  enum enum_thr_lock_result result= THR_LOCK_ABORTED;
156
381
  bool can_deadlock= test(data->owner->info->n_cursors);
157
382
 
 
383
  if (!in_wait_list)
158
384
  {
159
385
    (*wait->last)=data;                         /* Wait for lock */
160
386
    data->prev= wait->last;
161
387
    wait->last= &data->next;
162
388
  }
163
389
 
164
 
  current_global_counters.locks_waited++;
 
390
  statistic_increment(locks_waited, &THR_LOCK_lock);
165
391
 
166
392
  /* Set up control struct to allow others to abort locks */
167
 
  thread_var->current_mutex= data->lock->native_handle();
168
 
  thread_var->current_cond=  &thread_var->suspend;
169
 
  data->cond= &thread_var->suspend;;
 
393
  thread_var->current_mutex= &data->lock->mutex;
 
394
  thread_var->current_cond=  cond;
 
395
  data->cond= cond;
170
396
 
171
 
  while (not thread_var->abort)
 
397
  if (can_deadlock)
 
398
    set_timespec(wait_timeout, table_lock_wait_timeout);
 
399
  while (!thread_var->abort || in_wait_list)
172
400
  {
173
 
    boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
174
 
 
175
 
    if (can_deadlock)
176
 
    {
177
 
      boost::xtime xt; 
178
 
      xtime_get(&xt, boost::TIME_UTC); 
179
 
      xt.sec += table_lock_wait_timeout; 
180
 
      if (not cond->timed_wait(scoped, xt))
181
 
      {
182
 
        result= THR_LOCK_WAIT_TIMEOUT;
183
 
        scoped.release();
184
 
        break;
185
 
      }
186
 
    }
187
 
    else
188
 
    {
189
 
      cond->wait(scoped);
190
 
    }
 
401
    int rc= (can_deadlock ?
 
402
             pthread_cond_timedwait(cond, &data->lock->mutex,
 
403
                                    &wait_timeout) :
 
404
             pthread_cond_wait(cond, &data->lock->mutex));
191
405
    /*
192
406
      We must break the wait if one of the following occurs:
193
407
      - the connection has been aborted (!thread_var->abort), but
201
415
      Order of checks below is important to not report about timeout
202
416
      if the predicate is true.
203
417
    */
204
 
    if (data->cond == NULL)
205
 
    {
206
 
      scoped.release();
207
 
      break;
208
 
    }
209
 
    scoped.release();
 
418
    if (data->cond == 0)
 
419
    {
 
420
      break;
 
421
    }
 
422
    if (rc == ETIMEDOUT || rc == ETIME)
 
423
    {
 
424
      /* purecov: begin inspected */
 
425
      result= THR_LOCK_WAIT_TIMEOUT;
 
426
      break;
 
427
      /* purecov: end */
 
428
    }
210
429
  }
211
430
  if (data->cond || data->type == TL_UNLOCK)
212
431
  {
217
436
      else
218
437
        wait->last=data->prev;
219
438
      data->type= TL_UNLOCK;                    /* No lock */
 
439
      check_locks(data->lock, "killed or timed out wait_for_lock", 1);
220
440
      wake_up_waiters(data->lock);
221
441
    }
 
442
    else
 
443
    {
 
444
      check_locks(data->lock, "aborted wait_for_lock", 0);
 
445
    }
222
446
  }
223
447
  else
224
448
  {
225
449
    result= THR_LOCK_SUCCESS;
 
450
    if (data->lock->get_status)
 
451
      (*data->lock->get_status)(data->status_param, 0);
 
452
    check_locks(data->lock,"got wait_for_lock",0);
226
453
  }
227
 
  data->lock->unlock();
 
454
  pthread_mutex_unlock(&data->lock->mutex);
228
455
 
229
456
  /* The following must be done after unlock of lock->mutex */
230
 
  boost_unique_lock_t scopedLock(thread_var->mutex);
231
 
  thread_var->current_mutex= NULL;
232
 
  thread_var->current_cond= NULL;
 
457
  pthread_mutex_lock(&thread_var->mutex);
 
458
  thread_var->current_mutex= 0;
 
459
  thread_var->current_cond=  0;
 
460
  pthread_mutex_unlock(&thread_var->mutex);
233
461
  return(result);
234
462
}
235
463
 
236
464
 
237
 
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
 
465
enum enum_thr_lock_result
 
466
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
 
467
         enum thr_lock_type lock_type)
238
468
{
239
 
  THR_LOCK *lock= data->lock;
 
469
  THR_LOCK *lock=data->lock;
240
470
  enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
241
471
  struct st_lock_list *wait_queue;
242
472
  THR_LOCK_DATA *lock_owner;
245
475
  data->cond=0;                                 /* safety */
246
476
  data->type=lock_type;
247
477
  data->owner= owner;                           /* Must be reset ! */
248
 
  lock->lock();
 
478
  VOID(pthread_mutex_lock(&lock->mutex));
 
479
  check_locks(lock,(uint) lock_type <= (uint) TL_READ_NO_INSERT ?
 
480
              "enter read_lock" : "enter write_lock",0);
249
481
  if ((int) lock_type <= (int) TL_READ_NO_INSERT)
250
482
  {
251
483
    /* Request for READ lock */
261
493
      */
262
494
 
263
495
      if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
264
 
          (lock->write.data->type <= TL_WRITE_CONCURRENT_INSERT &&
265
 
           (((int) lock_type <= (int) TL_READ_WITH_SHARED_LOCKS) ||
 
496
          (lock->write.data->type <= TL_WRITE_DELAYED &&
 
497
           (((int) lock_type <= (int) TL_READ_HIGH_PRIORITY) ||
266
498
            (lock->write.data->type != TL_WRITE_CONCURRENT_INSERT &&
267
499
             lock->write.data->type != TL_WRITE_ALLOW_READ))))
268
500
      {                                         /* Already got a write lock */
271
503
        lock->read.last= &data->next;
272
504
        if (lock_type == TL_READ_NO_INSERT)
273
505
          lock->read_no_write_count++;
274
 
        current_global_counters.locks_immediate++;
 
506
        check_locks(lock,"read lock with old write lock",0);
 
507
        if (lock->get_status)
 
508
          (*lock->get_status)(data->status_param, 0);
 
509
        statistic_increment(locks_immediate,&THR_LOCK_lock);
275
510
        goto end;
276
511
      }
277
512
      if (lock->write.data->type == TL_WRITE_ONLY)
283
518
      }
284
519
    }
285
520
    else if (!lock->write_wait.data ||
286
 
             lock->write_wait.data->type <= TL_WRITE_DEFAULT ||
 
521
             lock->write_wait.data->type <= TL_WRITE_LOW_PRIORITY ||
 
522
             lock_type == TL_READ_HIGH_PRIORITY ||
287
523
             have_old_read_lock(lock->read.data, data->owner))
288
524
    {                                           /* No important write-locks */
289
525
      (*lock->read.last)=data;                  /* Add to running FIFO */
290
526
      data->prev=lock->read.last;
291
527
      lock->read.last= &data->next;
 
528
      if (lock->get_status)
 
529
        (*lock->get_status)(data->status_param, 0);
292
530
      if (lock_type == TL_READ_NO_INSERT)
293
531
        lock->read_no_write_count++;
294
 
      current_global_counters.locks_immediate++;
 
532
      check_locks(lock,"read lock with no write locks",0);
 
533
      statistic_increment(locks_immediate,&THR_LOCK_lock);
295
534
      goto end;
296
535
    }
297
536
    /*
303
542
  }
304
543
  else                                          /* Request for WRITE lock */
305
544
  {
306
 
    if (lock_type == TL_WRITE_CONCURRENT_INSERT)
 
545
    if (lock_type == TL_WRITE_DELAYED)
 
546
    {
 
547
      if (lock->write.data && lock->write.data->type == TL_WRITE_ONLY)
 
548
      {
 
549
        data->type=TL_UNLOCK;
 
550
        result= THR_LOCK_ABORTED;               /* Can't wait for this one */
 
551
        goto end;
 
552
      }
 
553
      /*
 
554
        if there is a TL_WRITE_ALLOW_READ lock, we have to wait for a lock
 
555
        (TL_WRITE_ALLOW_READ is used for ALTER TABLE in MySQL)
 
556
      */
 
557
      if ((!lock->write.data ||
 
558
           lock->write.data->type != TL_WRITE_ALLOW_READ) &&
 
559
          !have_specific_lock(lock->write_wait.data,TL_WRITE_ALLOW_READ) &&
 
560
          (lock->write.data || lock->read.data))
 
561
      {
 
562
        /* Add delayed write lock to write_wait queue, and return at once */
 
563
        (*lock->write_wait.last)=data;
 
564
        data->prev=lock->write_wait.last;
 
565
        lock->write_wait.last= &data->next;
 
566
        data->cond=get_cond();
 
567
        /*
 
568
          We don't have to do get_status here as we will do it when we change
 
569
          the delayed lock to a real write lock
 
570
        */
 
571
        statistic_increment(locks_immediate,&THR_LOCK_lock);
 
572
        goto end;
 
573
      }
 
574
    }
 
575
    else if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
307
576
      data->type=lock_type= thr_upgraded_concurrent_insert_lock;
308
577
 
309
578
    if (lock->write.data)                       /* If there is a write lock */
338
607
        (*lock->write.last)=data;       /* Add to running fifo */
339
608
        data->prev=lock->write.last;
340
609
        lock->write.last= &data->next;
341
 
        current_global_counters.locks_immediate++;
 
610
        check_locks(lock,"second write lock",0);
 
611
        if (data->lock->get_status)
 
612
          (*data->lock->get_status)(data->status_param, 0);
 
613
        statistic_increment(locks_immediate,&THR_LOCK_lock);
342
614
        goto end;
343
615
      }
344
616
    }
350
622
        if (lock_type == TL_WRITE_CONCURRENT_INSERT)
351
623
        {
352
624
          concurrent_insert= 1;
 
625
          if ((*lock->check_status)(data->status_param))
 
626
          {
 
627
            concurrent_insert= 0;
 
628
            data->type=lock_type= thr_upgraded_concurrent_insert_lock;
 
629
          }
353
630
        }
354
631
 
355
632
        if (!lock->read.data ||
356
 
            (lock_type <= TL_WRITE_CONCURRENT_INSERT &&
 
633
            (lock_type <= TL_WRITE_DELAYED &&
357
634
             ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
358
635
               lock_type != TL_WRITE_ALLOW_WRITE) ||
359
636
              !lock->read_no_write_count)))
361
638
          (*lock->write.last)=data;             /* Add as current write lock */
362
639
          data->prev=lock->write.last;
363
640
          lock->write.last= &data->next;
364
 
          current_global_counters.locks_immediate++;
 
641
          if (data->lock->get_status)
 
642
            (*data->lock->get_status)(data->status_param, concurrent_insert);
 
643
          check_locks(lock,"only write lock",0);
 
644
          statistic_increment(locks_immediate,&THR_LOCK_lock);
365
645
          goto end;
366
646
        }
367
647
      }
380
660
    result= THR_LOCK_DEADLOCK;
381
661
    goto end;
382
662
  }
383
 
 
384
663
  /* Can't get lock yet;  Wait for it */
385
 
  return(wait_for_lock(session, wait_queue, data));
 
664
  return(wait_for_lock(wait_queue, data, 0));
386
665
end:
387
 
  lock->unlock();
388
 
 
 
666
  pthread_mutex_unlock(&lock->mutex);
389
667
  return(result);
390
668
}
391
669
 
392
670
 
393
 
static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
 
671
static inline void free_all_read_locks(THR_LOCK *lock,
 
672
                                       bool using_concurrent_insert)
394
673
{
395
 
  THR_LOCK_DATA *data= lock->read_wait.data;
 
674
  THR_LOCK_DATA *data=lock->read_wait.data;
 
675
 
 
676
  check_locks(lock,"before freeing read locks",1);
396
677
 
397
678
  /* move all locks from read_wait list to read list */
398
679
  (*lock->read.last)=data;
404
685
 
405
686
  do
406
687
  {
407
 
    boost::condition_variable_any *cond= data->cond;
 
688
    pthread_cond_t *cond=data->cond;
408
689
    if ((int) data->type == (int) TL_READ_NO_INSERT)
409
690
    {
410
691
      if (using_concurrent_insert)
411
692
      {
412
693
        /*
413
 
          We can't free this lock;
 
694
          We can't free this lock; 
414
695
          Link lock away from read chain back into read_wait chain
415
696
        */
416
697
        if (((*data->prev)=data->next))
423
704
        continue;
424
705
      }
425
706
      lock->read_no_write_count++;
426
 
    }
427
 
    data->cond= NULL;                           /* Mark thread free */
428
 
    cond->notify_one();
 
707
    }      
 
708
    data->cond=0;                               /* Mark thread free */
 
709
    VOID(pthread_cond_signal(cond));
429
710
  } while ((data=data->next));
430
711
  *lock->read_wait.last=0;
431
712
  if (!lock->read_wait.data)
432
713
    lock->write_lock_count=0;
 
714
  check_locks(lock,"after giving read locks",0);
433
715
}
434
716
 
435
 
/* Unlock lock and free next thread on same lock */
 
717
        /* Unlock lock and free next thread on same lock */
436
718
 
437
 
static void thr_unlock(THR_LOCK_DATA *data)
 
719
void thr_unlock(THR_LOCK_DATA *data)
438
720
{
439
721
  THR_LOCK *lock=data->lock;
440
722
  enum thr_lock_type lock_type=data->type;
441
 
  lock->lock();
 
723
  pthread_mutex_lock(&lock->mutex);
 
724
  check_locks(lock,"start of release lock",0);
442
725
 
443
726
  if (((*data->prev)=data->next))               /* remove from lock-list */
444
727
    data->next->prev= data->prev;
445
728
  else if (lock_type <= TL_READ_NO_INSERT)
446
729
    lock->read.last=data->prev;
 
730
  else if (lock_type == TL_WRITE_DELAYED && data->cond)
 
731
  {
 
732
    /*
 
733
      This only happens in extreme circumstances when a 
 
734
      write delayed lock that is waiting for a lock
 
735
    */
 
736
    lock->write_wait.last=data->prev;           /* Put it on wait queue */
 
737
  }
447
738
  else
448
739
    lock->write.last=data->prev;
449
740
  if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
450
 
  { }
 
741
  {
 
742
    if (lock->update_status)
 
743
      (*lock->update_status)(data->status_param);
 
744
  }
451
745
  else
452
 
  { }
 
746
  {
 
747
    if (lock->restore_status)
 
748
      (*lock->restore_status)(data->status_param);
 
749
  }
453
750
  if (lock_type == TL_READ_NO_INSERT)
454
751
    lock->read_no_write_count--;
455
752
  data->type=TL_UNLOCK;                         /* Mark unlocked */
 
753
  check_locks(lock,"after releasing lock",1);
456
754
  wake_up_waiters(lock);
457
 
  lock->unlock();
 
755
  pthread_mutex_unlock(&lock->mutex);
 
756
  return;
458
757
}
459
758
 
460
759
 
478
777
    {
479
778
      /* Release write-locks with TL_WRITE or TL_WRITE_ONLY priority first */
480
779
      if (data &&
481
 
          (!lock->read_wait.data || lock->read_wait.data->type <= TL_READ_WITH_SHARED_LOCKS))
 
780
          (data->type != TL_WRITE_LOW_PRIORITY || !lock->read_wait.data ||
 
781
           lock->read_wait.data->type < TL_READ_HIGH_PRIORITY))
482
782
      {
483
783
        if (lock->write_lock_count++ > max_write_lock_count)
484
784
        {
500
800
          data->prev=lock->write.last;
501
801
          data->next=0;
502
802
          lock->write.last= &data->next;
503
 
 
 
803
          if (data->type == TL_WRITE_CONCURRENT_INSERT &&
 
804
              (*lock->check_status)(data->status_param))
 
805
            data->type=TL_WRITE;                        /* Upgrade lock */
504
806
          {
505
 
            boost::condition_variable_any *cond= data->cond;
506
 
            data->cond= NULL;                           /* Mark thread free */
507
 
            cond->notify_one(); /* Start waiting thred */
 
807
            pthread_cond_t *cond=data->cond;
 
808
            data->cond=0;                               /* Mark thread free */
 
809
            VOID(pthread_cond_signal(cond));    /* Start waiting thread */
508
810
          }
509
811
          if (data->type != TL_WRITE_ALLOW_WRITE ||
510
812
              !lock->write_wait.data ||
512
814
            break;
513
815
          data=lock->write_wait.data;           /* Free this too */
514
816
        }
515
 
        if (data->type >= TL_WRITE)
 
817
        if (data->type >= TL_WRITE_LOW_PRIORITY)
516
818
          goto end;
517
819
        /* Release possible read locks together with the write lock */
518
820
      }
523
825
                             data->type == TL_WRITE_ALLOW_WRITE));
524
826
    }
525
827
    else if (data &&
526
 
             (lock_type=data->type) <= TL_WRITE_CONCURRENT_INSERT &&
 
828
             (lock_type=data->type) <= TL_WRITE_DELAYED &&
527
829
             ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
528
830
               lock_type != TL_WRITE_ALLOW_WRITE) ||
529
831
              !lock->read_no_write_count))
530
832
    {
 
833
      /*
 
834
        For DELAYED, ALLOW_READ, WRITE_ALLOW_WRITE or CONCURRENT_INSERT locks
 
835
        start WRITE locks together with the READ locks
 
836
      */
 
837
      if (lock_type == TL_WRITE_CONCURRENT_INSERT &&
 
838
          (*lock->check_status)(data->status_param))
 
839
      {
 
840
        data->type=TL_WRITE;                    /* Upgrade lock */
 
841
        if (lock->read_wait.data)
 
842
          free_all_read_locks(lock,0);
 
843
        goto end;
 
844
      }
531
845
      do {
532
 
        boost::condition_variable_any *cond= data->cond;
 
846
        pthread_cond_t *cond=data->cond;
533
847
        if (((*data->prev)=data->next))         /* remove from wait-list */
534
848
          data->next->prev= data->prev;
535
849
        else
538
852
        data->prev=lock->write.last;
539
853
        lock->write.last= &data->next;
540
854
        data->next=0;                           /* Only one write lock */
541
 
        data->cond= NULL;                               /* Mark thread free */
542
 
        cond->notify_one(); /* Start waiting thread */
 
855
        data->cond=0;                           /* Mark thread free */
 
856
        VOID(pthread_cond_signal(cond));        /* Start waiting thread */
543
857
      } while (lock_type == TL_WRITE_ALLOW_WRITE &&
544
858
               (data=lock->write_wait.data) &&
545
859
               data->type == TL_WRITE_ALLOW_WRITE);
549
863
                             lock_type == TL_WRITE_ALLOW_WRITE));
550
864
    }
551
865
    else if (!data && lock->read_wait.data)
552
 
    {
553
866
      free_all_read_locks(lock,0);
554
 
    }
555
867
  }
556
868
end:
 
869
  check_locks(lock, "after waking up waiters", 0);
557
870
  return;
558
871
}
559
872
 
565
878
*/
566
879
 
567
880
 
568
 
#define LOCK_CMP(A,B) ((unsigned char*) (A->lock) - (uint32_t) ((A)->type) < (unsigned char*) (B->lock)- (uint32_t) ((B)->type))
 
881
#define LOCK_CMP(A,B) ((uchar*) (A->lock) - (uint) ((A)->type) < (uchar*) (B->lock)- (uint) ((B)->type))
569
882
 
570
 
static void sort_locks(THR_LOCK_DATA **data,uint32_t count)
 
883
static void sort_locks(THR_LOCK_DATA **data,uint count)
571
884
{
572
885
  THR_LOCK_DATA **pos,**end,**prev,*tmp;
573
886
 
589
902
 
590
903
 
591
904
enum enum_thr_lock_result
592
 
thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
 
905
thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_OWNER *owner)
593
906
{
594
907
  THR_LOCK_DATA **pos,**end;
595
908
  if (count > 1)
597
910
  /* lock everything */
598
911
  for (pos=data,end=data+count; pos < end ; pos++)
599
912
  {
600
 
    enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
 
913
    enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
601
914
    if (result != THR_LOCK_SUCCESS)
602
915
    {                                           /* Aborted */
603
 
      thr_multi_unlock(data,(uint32_t) (pos-data));
 
916
      thr_multi_unlock(data,(uint) (pos-data));
604
917
      return(result);
605
918
    }
 
919
#ifdef MAIN
 
920
    printf("Thread: %s  Got lock: 0x%lx  type: %d\n",my_thread_name(),
 
921
           (long) pos[0]->lock, pos[0]->type); fflush(stdout);
 
922
#endif
606
923
  }
607
924
  /*
608
925
    Ensure that all get_locks() have the same status
617
934
    do
618
935
    {
619
936
      pos--;
620
 
      last_lock=(*pos);
 
937
      if (last_lock->lock == (*pos)->lock &&
 
938
          last_lock->lock->copy_status)
 
939
      {
 
940
        if (last_lock->type <= TL_READ_NO_INSERT)
 
941
        {
 
942
          THR_LOCK_DATA **read_lock;
 
943
          /*
 
944
            If we are locking the same table with read locks we must ensure
 
945
            that all tables share the status of the last write lock or
 
946
            the same read lock.
 
947
          */
 
948
          for (;
 
949
               (*pos)->type <= TL_READ_NO_INSERT &&
 
950
                 pos != data &&
 
951
                 pos[-1]->lock == (*pos)->lock ;
 
952
               pos--) ;
 
953
 
 
954
          read_lock = pos+1;
 
955
          do
 
956
          {
 
957
            (last_lock->lock->copy_status)((*read_lock)->status_param,
 
958
                                           (*pos)->status_param);
 
959
          } while (*(read_lock++) != last_lock);
 
960
          last_lock= (*pos);                    /* Point at last write lock */
 
961
        }
 
962
        else
 
963
          (*last_lock->lock->copy_status)((*pos)->status_param,
 
964
                                          last_lock->status_param);
 
965
      }
 
966
      else
 
967
        last_lock=(*pos);
621
968
    } while (pos != data);
622
969
  }
623
970
#endif
626
973
 
627
974
  /* free all locks */
628
975
 
629
 
void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count)
 
976
void thr_multi_unlock(THR_LOCK_DATA **data,uint count)
630
977
{
631
978
  THR_LOCK_DATA **pos,**end;
632
979
 
633
980
  for (pos=data,end=data+count; pos < end ; pos++)
634
981
  {
 
982
#ifdef MAIN
 
983
    printf("Thread: %s  Rel lock: 0x%lx  type: %d\n",
 
984
           my_thread_name(), (long) pos[0]->lock, pos[0]->type);
 
985
    fflush(stdout);
 
986
#endif
635
987
    if ((*pos)->type != TL_UNLOCK)
636
988
      thr_unlock(*pos);
637
989
  }
638
990
  return;
639
991
}
640
992
 
641
 
void DrizzleLock::unlock(uint32_t count)
642
 
{
643
 
  THR_LOCK_DATA **pos,**end;
644
 
 
645
 
  for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
646
 
  {
647
 
    if ((*pos)->type != TL_UNLOCK)
648
 
      thr_unlock(*pos);
649
 
  }
650
 
}
651
 
 
652
993
/*
653
994
  Abort all threads waiting for a lock. The lock will be upgraded to
654
995
  TL_WRITE_ONLY to abort any new accesses to the lock
655
996
*/
656
997
 
657
 
void THR_LOCK::abort_locks()
 
998
void thr_abort_locks(THR_LOCK *lock, bool upgrade_lock)
658
999
{
659
 
  boost_unique_lock_t scopedLock(mutex);
 
1000
  THR_LOCK_DATA *data;
 
1001
  pthread_mutex_lock(&lock->mutex);
660
1002
 
661
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
1003
  for (data=lock->read_wait.data; data ; data=data->next)
662
1004
  {
663
 
    local_data->type= TL_UNLOCK;                        /* Mark killed */
 
1005
    data->type=TL_UNLOCK;                       /* Mark killed */
664
1006
    /* It's safe to signal the cond first: we're still holding the mutex. */
665
 
    local_data->cond->notify_one();
666
 
    local_data->cond= NULL;                             /* Removed from list */
 
1007
    pthread_cond_signal(data->cond);
 
1008
    data->cond=0;                               /* Removed from list */
667
1009
  }
668
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
1010
  for (data=lock->write_wait.data; data ; data=data->next)
669
1011
  {
670
 
    local_data->type= TL_UNLOCK;
671
 
    local_data->cond->notify_one();
672
 
    local_data->cond= NULL;
 
1012
    data->type=TL_UNLOCK;
 
1013
    pthread_cond_signal(data->cond);
 
1014
    data->cond=0;
673
1015
  }
674
 
  read_wait.last= &read_wait.data;
675
 
  write_wait.last= &write_wait.data;
676
 
  read_wait.data= write_wait.data=0;
677
 
  if (write.data)
678
 
    write.data->type=TL_WRITE_ONLY;
 
1016
  lock->read_wait.last= &lock->read_wait.data;
 
1017
  lock->write_wait.last= &lock->write_wait.data;
 
1018
  lock->read_wait.data=lock->write_wait.data=0;
 
1019
  if (upgrade_lock && lock->write.data)
 
1020
    lock->write.data->type=TL_WRITE_ONLY;
 
1021
  pthread_mutex_unlock(&lock->mutex);
 
1022
  return;
679
1023
}
680
1024
 
681
1025
 
685
1029
  This is used to abort all locks for a specific thread
686
1030
*/
687
1031
 
688
 
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
 
1032
bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread_id)
689
1033
{
 
1034
  THR_LOCK_DATA *data;
690
1035
  bool found= false;
691
1036
 
692
 
  boost_unique_lock_t scopedLock(mutex);
693
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
1037
  pthread_mutex_lock(&lock->mutex);
 
1038
  for (data= lock->read_wait.data; data ; data= data->next)
694
1039
  {
695
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
1040
    if (data->owner->info->thread_id == thread_id)    /* purecov: tested */
696
1041
    {
697
 
      local_data->type= TL_UNLOCK;                      /* Mark killed */
 
1042
      data->type= TL_UNLOCK;                    /* Mark killed */
698
1043
      /* It's safe to signal the cond first: we're still holding the mutex. */
699
1044
      found= true;
700
 
      local_data->cond->notify_one();
701
 
      local_data->cond= 0;                              /* Removed from list */
 
1045
      pthread_cond_signal(data->cond);
 
1046
      data->cond= 0;                            /* Removed from list */
702
1047
 
703
 
      if (((*local_data->prev)= local_data->next))
704
 
        local_data->next->prev= local_data->prev;
 
1048
      if (((*data->prev)= data->next))
 
1049
        data->next->prev= data->prev;
705
1050
      else
706
 
        read_wait.last= local_data->prev;
 
1051
        lock->read_wait.last= data->prev;
707
1052
    }
708
1053
  }
709
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
1054
  for (data= lock->write_wait.data; data ; data= data->next)
710
1055
  {
711
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
1056
    if (data->owner->info->thread_id == thread_id) /* purecov: tested */
712
1057
    {
713
 
      local_data->type= TL_UNLOCK;
 
1058
      data->type= TL_UNLOCK;
714
1059
      found= true;
715
 
      local_data->cond->notify_one();
716
 
      local_data->cond= NULL;
717
 
 
718
 
      if (((*local_data->prev)= local_data->next))
719
 
        local_data->next->prev= local_data->prev;
720
 
      else
721
 
        write_wait.last= local_data->prev;
722
 
    }
723
 
  }
724
 
  wake_up_waiters(this);
725
 
 
726
 
  return found;
727
 
}
728
 
 
729
 
} /* namespace drizzled */
 
1060
      pthread_cond_signal(data->cond);
 
1061
      data->cond= 0;
 
1062
 
 
1063
      if (((*data->prev)= data->next))
 
1064
        data->next->prev= data->prev;
 
1065
      else
 
1066
        lock->write_wait.last= data->prev;
 
1067
    }
 
1068
  }
 
1069
  wake_up_waiters(lock);
 
1070
  pthread_mutex_unlock(&lock->mutex);
 
1071
  return(found);
 
1072
}
 
1073
 
 
1074
 
 
1075
/*
 
1076
  Downgrade a WRITE_* to a lower WRITE level
 
1077
  SYNOPSIS
 
1078
    thr_downgrade_write_lock()
 
1079
    in_data                   Lock data of thread downgrading its lock
 
1080
    new_lock_type             New write lock type
 
1081
  RETURN VALUE
 
1082
    NONE
 
1083
  DESCRIPTION
 
1084
    This can be used to downgrade a lock already owned. When the downgrade
 
1085
    occurs also other waiters, both readers and writers can be allowed to
 
1086
    start.
 
1087
    The previous lock is often TL_WRITE_ONLY but can also be
 
1088
    TL_WRITE and TL_WRITE_ALLOW_READ. The normal downgrade variants are
 
1089
    TL_WRITE_ONLY => TL_WRITE_ALLOW_READ After a short exclusive lock
 
1090
    TL_WRITE_ALLOW_READ => TL_WRITE_ALLOW_WRITE After discovering that the
 
1091
    operation didn't need such a high lock.
 
1092
    TL_WRITE_ONLY => TL_WRITE after a short exclusive lock while holding a
 
1093
    write table lock
 
1094
    TL_WRITE_ONLY => TL_WRITE_ALLOW_WRITE After a short exclusive lock after
 
1095
    already earlier having dongraded lock to TL_WRITE_ALLOW_WRITE
 
1096
    The implementation is conservative and rather don't start rather than
 
1097
    go on unknown paths to start, the common cases are handled.
 
1098
 
 
1099
    NOTE:
 
1100
    In its current implementation it is only allowed to downgrade from
 
1101
    TL_WRITE_ONLY. In this case there are no waiters. Thus no wake up
 
1102
    logic is required.
 
1103
*/
 
1104
 
 
1105
void thr_downgrade_write_lock(THR_LOCK_DATA *in_data,
 
1106
                              enum thr_lock_type new_lock_type)
 
1107
{
 
1108
  THR_LOCK *lock=in_data->lock;
 
1109
 
 
1110
  pthread_mutex_lock(&lock->mutex);
 
1111
  in_data->type= new_lock_type;
 
1112
  check_locks(lock,"after downgrading lock",0);
 
1113
 
 
1114
  pthread_mutex_unlock(&lock->mutex);
 
1115
  return;
 
1116
}
 
1117
 
 
1118
/* Upgrade a WRITE_DELAY lock to a WRITE_LOCK */
 
1119
 
 
1120
bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data)
 
1121
{
 
1122
  THR_LOCK *lock=data->lock;
 
1123
 
 
1124
  pthread_mutex_lock(&lock->mutex);
 
1125
  if (data->type == TL_UNLOCK || data->type >= TL_WRITE_LOW_PRIORITY)
 
1126
  {
 
1127
    pthread_mutex_unlock(&lock->mutex);
 
1128
    return(data->type == TL_UNLOCK);    /* Test if Aborted */
 
1129
  }
 
1130
  check_locks(lock,"before upgrading lock",0);
 
1131
  /* TODO:  Upgrade to TL_WRITE_CONCURRENT_INSERT in some cases */
 
1132
  data->type=TL_WRITE;                          /* Upgrade lock */
 
1133
 
 
1134
  /* Check if someone has given us the lock */
 
1135
  if (!data->cond)
 
1136
  {
 
1137
    if (!lock->read.data)                       /* No read locks */
 
1138
    {                                           /* We have the lock */
 
1139
      if (data->lock->get_status)
 
1140
        (*data->lock->get_status)(data->status_param, 0);
 
1141
      pthread_mutex_unlock(&lock->mutex);
 
1142
      return(0);
 
1143
    }
 
1144
 
 
1145
    if (((*data->prev)=data->next))             /* remove from lock-list */
 
1146
      data->next->prev= data->prev;
 
1147
    else
 
1148
      lock->write.last=data->prev;
 
1149
 
 
1150
    if ((data->next=lock->write_wait.data))     /* Put first in lock_list */
 
1151
      data->next->prev= &data->next;
 
1152
    else
 
1153
      lock->write_wait.last= &data->next;
 
1154
    data->prev= &lock->write_wait.data;
 
1155
    lock->write_wait.data=data;
 
1156
    check_locks(lock,"upgrading lock",0);
 
1157
  }
 
1158
  else
 
1159
  {
 
1160
    check_locks(lock,"waiting for lock",0);
 
1161
  }
 
1162
  return(wait_for_lock(&lock->write_wait,data,1));
 
1163
}
 
1164
 
 
1165
 
 
1166
/* downgrade a WRITE lock to a WRITE_DELAY lock if there is pending locks */
 
1167
 
 
1168
bool thr_reschedule_write_lock(THR_LOCK_DATA *data)
 
1169
{
 
1170
  THR_LOCK *lock=data->lock;
 
1171
 
 
1172
  pthread_mutex_lock(&lock->mutex);
 
1173
  if (!lock->read_wait.data)                    /* No waiting read locks */
 
1174
  {
 
1175
    pthread_mutex_unlock(&lock->mutex);
 
1176
    return(0);
 
1177
  }
 
1178
 
 
1179
  data->type=TL_WRITE_DELAYED;
 
1180
  if (lock->update_status)
 
1181
    (*lock->update_status)(data->status_param);
 
1182
  if (((*data->prev)=data->next))               /* remove from lock-list */
 
1183
    data->next->prev= data->prev;
 
1184
  else
 
1185
    lock->write.last=data->prev;
 
1186
 
 
1187
  if ((data->next=lock->write_wait.data))       /* Put first in lock_list */
 
1188
    data->next->prev= &data->next;
 
1189
  else
 
1190
    lock->write_wait.last= &data->next;
 
1191
  data->prev= &lock->write_wait.data;
 
1192
  data->cond=get_cond();                        /* This was zero */
 
1193
  lock->write_wait.data=data;
 
1194
  free_all_read_locks(lock,0);
 
1195
 
 
1196
  pthread_mutex_unlock(&lock->mutex);
 
1197
  return(thr_upgrade_write_delay_lock(data));
 
1198
}
 
1199
 
 
1200
 
 
1201
#include <my_sys.h>
 
1202
 
 
1203
static void thr_print_lock(const char* name,struct st_lock_list *list)
 
1204
{
 
1205
  THR_LOCK_DATA *data,**prev;
 
1206
  uint count=0;
 
1207
 
 
1208
  if (list->data)
 
1209
  {
 
1210
    printf("%-10s: ",name);
 
1211
    prev= &list->data;
 
1212
    for (data=list->data; data && count++ < MAX_LOCKS ; data=data->next)
 
1213
    {
 
1214
      printf("0x%lx (%lu:%d); ", (ulong) data, data->owner->info->thread_id,
 
1215
             (int) data->type);
 
1216
      if (data->prev != prev)
 
1217
        printf("\nWarning: prev didn't point at previous lock\n");
 
1218
      prev= &data->next;
 
1219
    }
 
1220
    puts("");
 
1221
    if (prev != list->last)
 
1222
      printf("Warning: last didn't point at last lock\n");
 
1223
  }
 
1224
}
 
1225
 
 
1226
void thr_print_locks(void)
 
1227
{
 
1228
  LIST *list;
 
1229
  uint count=0;
 
1230
 
 
1231
  pthread_mutex_lock(&THR_LOCK_lock);
 
1232
  puts("Current locks:");
 
1233
  for (list= thr_lock_thread_list; list && count++ < MAX_THREADS;
 
1234
       list= list_rest(list))
 
1235
  {
 
1236
    THR_LOCK *lock=(THR_LOCK*) list->data;
 
1237
    VOID(pthread_mutex_lock(&lock->mutex));
 
1238
    printf("lock: 0x%lx:",(ulong) lock);
 
1239
    if ((lock->write_wait.data || lock->read_wait.data) &&
 
1240
        (! lock->read.data && ! lock->write.data))
 
1241
      printf(" WARNING: ");
 
1242
    if (lock->write.data)
 
1243
      printf(" write");
 
1244
    if (lock->write_wait.data)
 
1245
      printf(" write_wait");
 
1246
    if (lock->read.data)
 
1247
      printf(" read");
 
1248
    if (lock->read_wait.data)
 
1249
      printf(" read_wait");
 
1250
    puts("");
 
1251
    thr_print_lock("write",&lock->write);
 
1252
    thr_print_lock("write_wait",&lock->write_wait);
 
1253
    thr_print_lock("read",&lock->read);
 
1254
    thr_print_lock("read_wait",&lock->read_wait);
 
1255
    VOID(pthread_mutex_unlock(&lock->mutex));
 
1256
    puts("");
 
1257
  }
 
1258
  fflush(stdout);
 
1259
  pthread_mutex_unlock(&THR_LOCK_lock);
 
1260
}
 
1261
 
 
1262
/*****************************************************************************
 
1263
** Test of thread locks
 
1264
****************************************************************************/
 
1265
 
 
1266
#ifdef MAIN
 
1267
 
 
1268
struct st_test {
 
1269
  uint lock_nr;
 
1270
  enum thr_lock_type lock_type;
 
1271
};
 
1272
 
 
1273
THR_LOCK locks[5];                      /* 4 locks */
 
1274
 
 
1275
struct st_test test_0[] = {{0,TL_READ}};        /* One lock */
 
1276
struct st_test test_1[] = {{0,TL_READ},{0,TL_WRITE}}; /* Read and write lock of lock 0 */
 
1277
struct st_test test_2[] = {{1,TL_WRITE},{0,TL_READ},{2,TL_READ}};
 
1278
struct st_test test_3[] = {{2,TL_WRITE},{1,TL_READ},{0,TL_READ}}; /* Deadlock with test_2 ? */
 
1279
struct st_test test_4[] = {{0,TL_WRITE},{0,TL_READ},{0,TL_WRITE},{0,TL_READ}};
 
1280
struct st_test test_5[] = {{0,TL_READ},{1,TL_READ},{2,TL_READ},{3,TL_READ}}; /* Many reads */
 
1281
struct st_test test_6[] = {{0,TL_WRITE},{1,TL_WRITE},{2,TL_WRITE},{3,TL_WRITE}}; /* Many writes */
 
1282
struct st_test test_7[] = {{3,TL_READ}};
 
1283
struct st_test test_8[] = {{1,TL_READ_NO_INSERT},{2,TL_READ_NO_INSERT},{3,TL_READ_NO_INSERT}};  /* Should be quick */
 
1284
struct st_test test_9[] = {{4,TL_READ_HIGH_PRIORITY}};
 
1285
struct st_test test_10[] ={{4,TL_WRITE}};
 
1286
struct st_test test_11[] = {{0,TL_WRITE_LOW_PRIORITY},{1,TL_WRITE_LOW_PRIORITY},{2,TL_WRITE_LOW_PRIORITY},{3,TL_WRITE_LOW_PRIORITY}}; /* Many writes */
 
1287
struct st_test test_12[] = {{0,TL_WRITE_ALLOW_READ},{1,TL_WRITE_ALLOW_READ},{2,TL_WRITE_ALLOW_READ},{3,TL_WRITE_ALLOW_READ}}; /* Many writes */
 
1288
struct st_test test_13[] = {{0,TL_WRITE_CONCURRENT_INSERT},{1,TL_WRITE_CONCURRENT_INSERT},{2,TL_WRITE_CONCURRENT_INSERT},{3,TL_WRITE_CONCURRENT_INSERT}};
 
1289
struct st_test test_14[] = {{0,TL_WRITE_CONCURRENT_INSERT},{1,TL_READ}};
 
1290
struct st_test test_15[] = {{0,TL_WRITE_ALLOW_WRITE},{1,TL_READ}};
 
1291
struct st_test test_16[] = {{0,TL_WRITE_ALLOW_WRITE},{1,TL_WRITE_ALLOW_WRITE}};
 
1292
 
 
1293
struct st_test *tests[] = {test_0,test_1,test_2,test_3,test_4,test_5,test_6,
 
1294
                           test_7,test_8,test_9,test_10,test_11,test_12,
 
1295
                           test_13,test_14,test_15,test_16};
 
1296
int lock_counts[]= {sizeof(test_0)/sizeof(struct st_test),
 
1297
                    sizeof(test_1)/sizeof(struct st_test),
 
1298
                    sizeof(test_2)/sizeof(struct st_test),
 
1299
                    sizeof(test_3)/sizeof(struct st_test),
 
1300
                    sizeof(test_4)/sizeof(struct st_test),
 
1301
                    sizeof(test_5)/sizeof(struct st_test),
 
1302
                    sizeof(test_6)/sizeof(struct st_test),
 
1303
                    sizeof(test_7)/sizeof(struct st_test),
 
1304
                    sizeof(test_8)/sizeof(struct st_test),
 
1305
                    sizeof(test_9)/sizeof(struct st_test),
 
1306
                    sizeof(test_10)/sizeof(struct st_test),
 
1307
                    sizeof(test_11)/sizeof(struct st_test),
 
1308
                    sizeof(test_12)/sizeof(struct st_test),
 
1309
                    sizeof(test_13)/sizeof(struct st_test),
 
1310
                    sizeof(test_14)/sizeof(struct st_test),
 
1311
                    sizeof(test_15)/sizeof(struct st_test),
 
1312
                    sizeof(test_16)/sizeof(struct st_test)
 
1313
};
 
1314
 
 
1315
 
 
1316
static pthread_cond_t COND_thread_count;
 
1317
static pthread_mutex_t LOCK_thread_count;
 
1318
static uint thread_count;
 
1319
static ulong sum=0;
 
1320
 
 
1321
#define MAX_LOCK_COUNT 8
 
1322
 
 
1323
/* The following functions is for WRITE_CONCURRENT_INSERT */
 
1324
 
 
1325
static void test_get_status(void* param __attribute__((unused)),
 
1326
                            int concurrent_insert __attribute__((unused)))
 
1327
{
 
1328
}
 
1329
 
 
1330
static void test_update_status(void* param __attribute__((unused)))
 
1331
{
 
1332
}
 
1333
 
 
1334
static void test_copy_status(void* to __attribute__((unused)) ,
 
1335
                             void *from __attribute__((unused)))
 
1336
{
 
1337
}
 
1338
 
 
1339
static bool test_check_status(void* param __attribute__((unused)))
 
1340
{
 
1341
  return 0;
 
1342
}
 
1343
 
 
1344
 
 
1345
static void *test_thread(void *arg)
 
1346
{
 
1347
  int i,j,param=*((int*) arg);
 
1348
  THR_LOCK_DATA data[MAX_LOCK_COUNT];
 
1349
  THR_LOCK_OWNER owner;
 
1350
  THR_LOCK_INFO lock_info;
 
1351
  THR_LOCK_DATA *multi_locks[MAX_LOCK_COUNT];
 
1352
  my_thread_init();
 
1353
 
 
1354
  printf("Thread %s (%d) started\n",my_thread_name(),param); fflush(stdout);
 
1355
 
 
1356
 
 
1357
  thr_lock_info_init(&lock_info);
 
1358
  thr_lock_owner_init(&owner, &lock_info);
 
1359
  for (i=0; i < lock_counts[param] ; i++)
 
1360
    thr_lock_data_init(locks+tests[param][i].lock_nr,data+i,NULL);
 
1361
  for (j=1 ; j < 10 ; j++)              /* try locking 10 times */
 
1362
  {
 
1363
    for (i=0; i < lock_counts[param] ; i++)
 
1364
    {                                   /* Init multi locks */
 
1365
      multi_locks[i]= &data[i];
 
1366
      data[i].type= tests[param][i].lock_type;
 
1367
    }
 
1368
    thr_multi_lock(multi_locks, lock_counts[param], &owner);
 
1369
    pthread_mutex_lock(&LOCK_thread_count);
 
1370
    {
 
1371
      int tmp=rand() & 7;                       /* Do something from 0-2 sec */
 
1372
      if (tmp == 0)
 
1373
        sleep(1);
 
1374
      else if (tmp == 1)
 
1375
        sleep(2);
 
1376
      else
 
1377
      {
 
1378
        ulong k;
 
1379
        for (k=0 ; k < (ulong) (tmp-2)*100000L ; k++)
 
1380
          sum+=k;
 
1381
      }
 
1382
    }
 
1383
    pthread_mutex_unlock(&LOCK_thread_count);
 
1384
    thr_multi_unlock(multi_locks,lock_counts[param]);
 
1385
  }
 
1386
 
 
1387
  printf("Thread %s (%d) ended\n",my_thread_name(),param); fflush(stdout);
 
1388
  thr_print_locks();
 
1389
  pthread_mutex_lock(&LOCK_thread_count);
 
1390
  thread_count--;
 
1391
  VOID(pthread_cond_signal(&COND_thread_count)); /* Tell main we are ready */
 
1392
  pthread_mutex_unlock(&LOCK_thread_count);
 
1393
  free((uchar*) arg);
 
1394
  return 0;
 
1395
}
 
1396
 
 
1397
 
 
1398
int main(int argc __attribute__((unused)),char **argv __attribute__((unused)))
 
1399
{
 
1400
  pthread_t tid;
 
1401
  pthread_attr_t thr_attr;
 
1402
  int i,*param,error;
 
1403
  MY_INIT(argv[0]);
 
1404
 
 
1405
  printf("Main thread: %s\n",my_thread_name());
 
1406
 
 
1407
  if ((error=pthread_cond_init(&COND_thread_count,NULL)))
 
1408
  {
 
1409
    fprintf(stderr,"Got error: %d from pthread_cond_init (errno: %d)",
 
1410
            error,errno);
 
1411
    exit(1);
 
1412
  }
 
1413
  if ((error=pthread_mutex_init(&LOCK_thread_count,MY_MUTEX_INIT_FAST)))
 
1414
  {
 
1415
    fprintf(stderr,"Got error: %d from pthread_cond_init (errno: %d)",
 
1416
            error,errno);
 
1417
    exit(1);
 
1418
  }
 
1419
 
 
1420
  for (i=0 ; i < (int) array_elements(locks) ; i++)
 
1421
  {
 
1422
    thr_lock_init(locks+i);
 
1423
    locks[i].check_status= test_check_status;
 
1424
    locks[i].update_status=test_update_status;
 
1425
    locks[i].copy_status=  test_copy_status;
 
1426
    locks[i].get_status=   test_get_status;
 
1427
  }
 
1428
  if ((error=pthread_attr_init(&thr_attr)))
 
1429
  {
 
1430
    fprintf(stderr,"Got error: %d from pthread_attr_init (errno: %d)",
 
1431
            error,errno);
 
1432
    exit(1);
 
1433
  }
 
1434
  if ((error=pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED)))
 
1435
  {
 
1436
    fprintf(stderr,
 
1437
            "Got error: %d from pthread_attr_setdetachstate (errno: %d)",
 
1438
            error,errno);
 
1439
    exit(1);
 
1440
  }
 
1441
#ifndef pthread_attr_setstacksize               /* void return value */
 
1442
  if ((error=pthread_attr_setstacksize(&thr_attr,65536L)))
 
1443
  {
 
1444
    fprintf(stderr,"Got error: %d from pthread_attr_setstacksize (errno: %d)",
 
1445
            error,errno);
 
1446
    exit(1);
 
1447
  }
 
1448
#endif
 
1449
#ifdef HAVE_THR_SETCONCURRENCY
 
1450
  VOID(thr_setconcurrency(2));
 
1451
#endif
 
1452
  for (i=0 ; i < (int) array_elements(lock_counts) ; i++)
 
1453
  {
 
1454
    param=(int*) malloc(sizeof(int));
 
1455
    *param=i;
 
1456
 
 
1457
    if ((error=pthread_mutex_lock(&LOCK_thread_count)))
 
1458
    {
 
1459
      fprintf(stderr,"Got error: %d from pthread_mutex_lock (errno: %d)",
 
1460
              error,errno);
 
1461
      exit(1);
 
1462
    }
 
1463
    if ((error=pthread_create(&tid,&thr_attr,test_thread,(void*) param)))
 
1464
    {
 
1465
      fprintf(stderr,"Got error: %d from pthread_create (errno: %d)\n",
 
1466
              error,errno);
 
1467
      pthread_mutex_unlock(&LOCK_thread_count);
 
1468
      exit(1);
 
1469
    }
 
1470
    thread_count++;
 
1471
    pthread_mutex_unlock(&LOCK_thread_count);
 
1472
  }
 
1473
 
 
1474
  pthread_attr_destroy(&thr_attr);
 
1475
  if ((error=pthread_mutex_lock(&LOCK_thread_count)))
 
1476
    fprintf(stderr,"Got error: %d from pthread_mutex_lock\n",error);
 
1477
  while (thread_count)
 
1478
  {
 
1479
    if ((error=pthread_cond_wait(&COND_thread_count,&LOCK_thread_count)))
 
1480
      fprintf(stderr,"Got error: %d from pthread_cond_wait\n",error);
 
1481
  }
 
1482
  if ((error=pthread_mutex_unlock(&LOCK_thread_count)))
 
1483
    fprintf(stderr,"Got error: %d from pthread_mutex_unlock\n",error);
 
1484
  for (i=0 ; i < (int) array_elements(locks) ; i++)
 
1485
    thr_lock_delete(locks+i);
 
1486
#ifdef EXTRA_DEBUG
 
1487
  if (found_errors)
 
1488
    printf("Got %d warnings\n",found_errors);
 
1489
  else
 
1490
#endif
 
1491
    printf("Test succeeded\n");
 
1492
  return 0;
 
1493
}
 
1494
 
 
1495
#endif /* MAIN */