~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/thr_lock.c

  • Committer: Mark Atwood
  • Date: 2008-10-16 11:33:16 UTC
  • mto: (520.1.13 drizzle)
  • mto: This revision was merged to the branch mainline in revision 530.
  • Revision ID: mark@fallenpegasus.com-20081016113316-ff6jdt31ck90sjdh
an implemention of the errmsg plugin

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
Read and write locks for Posix threads. All tread must acquire
26
26
 
27
27
TL_READ                 # Low priority read
28
28
TL_READ_WITH_SHARED_LOCKS
 
29
TL_READ_HIGH_PRIORITY   # High priority read
29
30
TL_READ_NO_INSERT       # Read without concurrent inserts
30
31
TL_WRITE_ALLOW_WRITE    # Write lock that allows other writers
31
32
TL_WRITE_ALLOW_READ     # Write lock, but allow reading
32
33
TL_WRITE_CONCURRENT_INSERT
33
34
                        # Insert that can be mixed when selects
 
35
TL_WRITE_DELAYED        # Used by delayed insert
 
36
                        # Allows lower locks to take over
 
37
TL_WRITE_LOW_PRIORITY   # Low priority write
34
38
TL_WRITE                # High priority write
35
39
TL_WRITE_ONLY           # High priority write
36
40
                        # Abort all new lock request with an error
46
50
should put a pointer to the following functions in the lock structure:
47
51
(If the pointer is zero (default), the function is not called)
48
52
 
 
53
check_status:
 
54
         Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
 
55
         we check if this function exists and returns 0.
 
56
         If not, then the lock is upgraded to TL_WRITE_LOCK
 
57
         In MyISAM this is a simple check if the insert can be done
 
58
         at the end of the datafile.
 
59
update_status:
 
60
        Before a write lock is released, this function is called.
 
61
        In MyISAM this functions updates the count and length of the datafile
 
62
get_status:
 
63
        When one gets a lock this functions is called.
 
64
        In MyISAM this stores the number of rows and size of the datafile
 
65
        for concurrent reads.
49
66
 
50
67
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
51
 
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
 
68
TL_WRITE_CONCURRENT_INSERT or one TL_WRITE_DELAYED lock at the same time as
 
69
multiple read locks.
52
70
 
53
71
*/
54
72
 
55
 
#include "config.h"
56
 
#include "drizzled/internal/my_sys.h"
57
 
#include "drizzled/internal/thread_var.h"
58
 
#include "drizzled/statistics_variables.h"
59
 
#include "drizzled/pthread_globals.h"
60
 
 
61
 
#include "drizzled/session.h"
62
 
#include "drizzled/current_session.h"
 
73
#include "mysys_priv.h"
63
74
 
64
75
#include "thr_lock.h"
65
 
#include "drizzled/internal/m_string.h"
 
76
#include <mystrings/m_string.h>
66
77
#include <errno.h>
67
 
#include <list>
68
78
 
69
79
#if TIME_WITH_SYS_TIME
70
80
# include <sys/time.h>
75
85
# else
76
86
#  include <time.h>
77
87
# endif
78
 
#endif
 
88
#endif  
79
89
 
80
90
#include <drizzled/util/test.h>
81
91
 
82
 
#include <boost/interprocess/sync/lock_options.hpp>
83
 
 
84
 
using namespace std;
85
 
 
86
 
namespace drizzled
 
92
bool thr_lock_inited=0;
 
93
uint32_t locks_immediate = 0L, locks_waited = 0L;
 
94
ulong table_lock_wait_timeout;
 
95
enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
 
96
 
 
97
/* The following constants are only for debug output */
 
98
#define MAX_THREADS 100
 
99
#define MAX_LOCKS   100
 
100
 
 
101
 
 
102
LIST *thr_lock_thread_list;                     /* List of threads in use */
 
103
ulong max_write_lock_count= ~(ulong) 0L;
 
104
 
 
105
static inline pthread_cond_t *get_cond(void)
87
106
{
88
 
 
89
 
uint64_t table_lock_wait_timeout;
90
 
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
91
 
 
92
 
 
93
 
uint64_t max_write_lock_count= UINT64_MAX;
94
 
 
95
 
static void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count);
 
107
  return &my_thread_var->suspend;
 
108
}
96
109
 
97
110
/*
98
111
** For the future (now the thread specific cond is alloced by my_pthread.c)
99
112
*/
100
113
 
 
114
bool init_thr_lock()
 
115
{
 
116
  thr_lock_inited=1;
 
117
  return 0;
 
118
}
 
119
 
101
120
static inline bool
102
121
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
103
122
{
104
123
  return rhs == lhs;
105
124
}
106
125
 
 
126
#ifdef EXTRA_DEBUG
 
127
#define MAX_FOUND_ERRORS        10              /* Report 10 first errors */
 
128
static uint32_t found_errors=0;
 
129
 
 
130
static int check_lock(struct st_lock_list *list, const char* lock_type,
 
131
                      const char *where, bool same_owner, bool no_cond)
 
132
{
 
133
  THR_LOCK_DATA *data,**prev;
 
134
  uint32_t count=0;
 
135
  THR_LOCK_OWNER *first_owner;
 
136
 
 
137
  prev= &list->data;
 
138
  if (list->data)
 
139
  {
 
140
    enum thr_lock_type last_lock_type=list->data->type;
 
141
 
 
142
    if (same_owner && list->data)
 
143
      first_owner= list->data->owner;
 
144
    for (data=list->data; data && count++ < MAX_LOCKS ; data=data->next)
 
145
    {
 
146
      if (data->type != last_lock_type)
 
147
        last_lock_type=TL_IGNORE;
 
148
      if (data->prev != prev)
 
149
      {
 
150
        fprintf(stderr,
 
151
                "Warning: prev link %d didn't point at previous lock at %s: %s\n",
 
152
                count, lock_type, where);
 
153
        return 1;
 
154
      }
 
155
      if (same_owner &&
 
156
          !thr_lock_owner_equal(data->owner, first_owner) &&
 
157
          last_lock_type != TL_WRITE_ALLOW_WRITE)
 
158
      {
 
159
        fprintf(stderr,
 
160
                "Warning: Found locks from different threads in %s: %s\n",
 
161
                lock_type,where);
 
162
        return 1;
 
163
      }
 
164
      if (no_cond && data->cond)
 
165
      {
 
166
        fprintf(stderr,
 
167
                "Warning: Found active lock with not reset cond %s: %s\n",
 
168
                lock_type,where);
 
169
        return 1;
 
170
      }
 
171
      prev= &data->next;
 
172
    }
 
173
    if (data)
 
174
    {
 
175
      fprintf(stderr,"Warning: found too many locks at %s: %s\n",
 
176
              lock_type,where);
 
177
      return 1;
 
178
    }
 
179
  }
 
180
  if (prev != list->last)
 
181
  {
 
182
    fprintf(stderr,"Warning: last didn't point at last lock at %s: %s\n",
 
183
            lock_type, where);
 
184
    return 1;
 
185
  }
 
186
  return 0;
 
187
}
 
188
 
 
189
 
 
190
static void check_locks(THR_LOCK *lock, const char *where,
 
191
                        bool allow_no_locks)
 
192
{
 
193
  uint32_t old_found_errors=found_errors;
 
194
 
 
195
  if (found_errors < MAX_FOUND_ERRORS)
 
196
  {
 
197
    if (check_lock(&lock->write,"write",where,1,1) |
 
198
        check_lock(&lock->write_wait,"write_wait",where,0,0) |
 
199
        check_lock(&lock->read,"read",where,0,1) |
 
200
        check_lock(&lock->read_wait,"read_wait",where,0,0))
 
201
      found_errors++;
 
202
 
 
203
    if (found_errors < MAX_FOUND_ERRORS)
 
204
    {
 
205
      uint32_t count=0;
 
206
      THR_LOCK_DATA *data;
 
207
      for (data=lock->read.data ; data ; data=data->next)
 
208
      {
 
209
        if ((int) data->type == (int) TL_READ_NO_INSERT)
 
210
          count++;
 
211
        /* Protect against infinite loop. */
 
212
        assert(count <= lock->read_no_write_count);
 
213
      }
 
214
      if (count != lock->read_no_write_count)
 
215
      {
 
216
        found_errors++;
 
217
        fprintf(stderr,
 
218
                "Warning at '%s': Locks read_no_write_count was %u when it should have been %u\n", where, lock->read_no_write_count,count);
 
219
      }      
 
220
 
 
221
      if (!lock->write.data)
 
222
      {
 
223
        if (!allow_no_locks && !lock->read.data &&
 
224
            (lock->write_wait.data || lock->read_wait.data))
 
225
        {
 
226
          found_errors++;
 
227
          fprintf(stderr,
 
228
                  "Warning at '%s': No locks in use but locks are in wait queue\n",
 
229
                  where);
 
230
        }
 
231
        if (!lock->write_wait.data)
 
232
        {
 
233
          if (!allow_no_locks && lock->read_wait.data)
 
234
          {
 
235
            found_errors++;
 
236
            fprintf(stderr,
 
237
                    "Warning at '%s': No write locks and waiting read locks\n",
 
238
                    where);
 
239
          }
 
240
        }
 
241
        else
 
242
        {
 
243
          if (!allow_no_locks &&
 
244
              (((lock->write_wait.data->type == TL_WRITE_CONCURRENT_INSERT ||
 
245
                 lock->write_wait.data->type == TL_WRITE_ALLOW_WRITE) &&
 
246
                !lock->read_no_write_count) ||
 
247
               lock->write_wait.data->type == TL_WRITE_ALLOW_READ ||
 
248
               (lock->write_wait.data->type == TL_WRITE_DELAYED &&
 
249
                !lock->read.data)))
 
250
          {
 
251
            found_errors++;
 
252
            fprintf(stderr,
 
253
                    "Warning at '%s': Write lock %d waiting while no exclusive read locks\n",where,(int) lock->write_wait.data->type);
 
254
          }
 
255
        }             
 
256
      }
 
257
      else
 
258
      {                                         /* Have write lock */
 
259
        if (lock->write_wait.data)
 
260
        {
 
261
          if (!allow_no_locks && 
 
262
              lock->write.data->type == TL_WRITE_ALLOW_WRITE &&
 
263
              lock->write_wait.data->type == TL_WRITE_ALLOW_WRITE)
 
264
          {
 
265
            found_errors++;
 
266
            fprintf(stderr,
 
267
                    "Warning at '%s': Found WRITE_ALLOW_WRITE lock waiting for WRITE_ALLOW_WRITE lock\n",
 
268
                    where);
 
269
          }
 
270
        }
 
271
        if (lock->read.data)
 
272
        {
 
273
          if (!thr_lock_owner_equal(lock->write.data->owner,
 
274
                                    lock->read.data->owner) &&
 
275
              ((lock->write.data->type > TL_WRITE_DELAYED &&
 
276
                lock->write.data->type != TL_WRITE_ONLY) ||
 
277
               ((lock->write.data->type == TL_WRITE_CONCURRENT_INSERT ||
 
278
                 lock->write.data->type == TL_WRITE_ALLOW_WRITE) &&
 
279
                lock->read_no_write_count)))
 
280
          {
 
281
            found_errors++;
 
282
            fprintf(stderr,
 
283
                    "Warning at '%s': Found lock of type %d that is write and read locked\n",
 
284
                    where, lock->write.data->type);
 
285
          }
 
286
        }
 
287
        if (lock->read_wait.data)
 
288
        {
 
289
          if (!allow_no_locks && lock->write.data->type <= TL_WRITE_DELAYED &&
 
290
              lock->read_wait.data->type <= TL_READ_HIGH_PRIORITY)
 
291
          {
 
292
            found_errors++;
 
293
            fprintf(stderr,
 
294
                    "Warning at '%s': Found read lock of type %d waiting for write lock of type %d\n",
 
295
                    where,
 
296
                    (int) lock->read_wait.data->type,
 
297
                    (int) lock->write.data->type);
 
298
          }
 
299
        }
 
300
      }
 
301
    }
 
302
  }
 
303
  return;
 
304
}
 
305
 
 
306
#else /* EXTRA_DEBUG */
 
307
#define check_locks(A,B,C)
 
308
#endif
 
309
 
107
310
 
108
311
        /* Initialize a lock */
109
312
 
110
313
void thr_lock_init(THR_LOCK *lock)
111
314
{
112
 
  lock->init();
 
315
  memset(lock, 0, sizeof(*lock));
 
316
  pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
113
317
  lock->read.last= &lock->read.data;
114
318
  lock->read_wait.last= &lock->read_wait.data;
115
319
  lock->write_wait.last= &lock->write_wait.data;
116
320
  lock->write.last= &lock->write.data;
117
 
}
118
 
 
119
 
 
120
 
void THR_LOCK_INFO::init()
121
 
{
122
 
  internal::st_my_thread_var *tmp= my_thread_var;
123
 
  thread= tmp->pthread_self;
124
 
  thread_id= tmp->id;
125
 
  n_cursors= 0;
 
321
 
 
322
  pthread_mutex_lock(&THR_LOCK_lock);           /* Add to locks in use */
 
323
  lock->list.data=(void*) lock;
 
324
  thr_lock_thread_list=list_add(thr_lock_thread_list,&lock->list);
 
325
  pthread_mutex_unlock(&THR_LOCK_lock);
 
326
  return;
 
327
}
 
328
 
 
329
 
 
330
void thr_lock_delete(THR_LOCK *lock)
 
331
{
 
332
  pthread_mutex_destroy(&lock->mutex);
 
333
  pthread_mutex_lock(&THR_LOCK_lock);
 
334
  thr_lock_thread_list=list_delete(thr_lock_thread_list,&lock->list);
 
335
  pthread_mutex_unlock(&THR_LOCK_lock);
 
336
  return;
 
337
}
 
338
 
 
339
 
 
340
void thr_lock_info_init(THR_LOCK_INFO *info)
 
341
{
 
342
  struct st_my_thread_var *tmp= my_thread_var;
 
343
  info->thread=    tmp->pthread_self;
 
344
  info->thread_id= tmp->id;
 
345
  info->n_cursors= 0;
126
346
}
127
347
 
128
348
        /* Initialize a lock instance */
129
349
 
130
 
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
 
350
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
131
351
{
132
 
  lock= lock_arg;
133
 
  type= TL_UNLOCK;
134
 
  owner= NULL;                               /* no owner yet */
135
 
  status_param= param_arg;
136
 
  cond= NULL;
 
352
  data->lock=lock;
 
353
  data->type=TL_UNLOCK;
 
354
  data->owner= 0;                               /* no owner yet */
 
355
  data->status_param=param;
 
356
  data->cond=0;
137
357
}
138
358
 
139
359
 
143
363
  for ( ; data ; data=data->next)
144
364
  {
145
365
    if (thr_lock_owner_equal(data->owner, owner))
146
 
      return true;                                      /* Already locked by thread */
147
 
  }
148
 
  return false;
149
 
}
 
366
      return 1;                                 /* Already locked by thread */
 
367
  }
 
368
  return 0;
 
369
}
 
370
 
 
371
static inline bool have_specific_lock(THR_LOCK_DATA *data,
 
372
                                         enum thr_lock_type type)
 
373
{
 
374
  for ( ; data ; data=data->next)
 
375
  {
 
376
    if (data->type == type)
 
377
      return 1;
 
378
  }
 
379
  return 0;
 
380
}
 
381
 
150
382
 
151
383
static void wake_up_waiters(THR_LOCK *lock);
152
384
 
153
385
 
154
 
static enum enum_thr_lock_result wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, bool in_wait_list)
 
386
static enum enum_thr_lock_result
 
387
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
 
388
              bool in_wait_list)
155
389
{
156
 
  Session *session= current_session;
157
 
  internal::st_my_thread_var *thread_var= session->getThreadVar();
158
 
 
159
 
  boost::condition_variable_any *cond= &thread_var->suspend;
 
390
  struct st_my_thread_var *thread_var= my_thread_var;
 
391
  pthread_cond_t *cond= &thread_var->suspend;
 
392
  struct timespec wait_timeout;
160
393
  enum enum_thr_lock_result result= THR_LOCK_ABORTED;
161
394
  bool can_deadlock= test(data->owner->info->n_cursors);
162
395
 
167
400
    wait->last= &data->next;
168
401
  }
169
402
 
170
 
  current_global_counters.locks_waited++;
 
403
  statistic_increment(locks_waited, &THR_LOCK_lock);
171
404
 
172
405
  /* Set up control struct to allow others to abort locks */
173
 
  thread_var->current_mutex= data->lock->native_handle();
174
 
  thread_var->current_cond=  &thread_var->suspend;
175
 
  data->cond= &thread_var->suspend;;
 
406
  thread_var->current_mutex= &data->lock->mutex;
 
407
  thread_var->current_cond=  cond;
 
408
  data->cond= cond;
176
409
 
 
410
  if (can_deadlock)
 
411
    set_timespec(wait_timeout, table_lock_wait_timeout);
177
412
  while (!thread_var->abort || in_wait_list)
178
413
  {
179
 
    boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
180
 
 
181
 
    if (can_deadlock)
182
 
    {
183
 
      boost::xtime xt; 
184
 
      xtime_get(&xt, boost::TIME_UTC); 
185
 
      xt.sec += table_lock_wait_timeout; 
186
 
      if (not cond->timed_wait(scoped, xt))
187
 
      {
188
 
        result= THR_LOCK_WAIT_TIMEOUT;
189
 
        scoped.release();
190
 
        break;
191
 
      }
192
 
    }
193
 
    else
194
 
    {
195
 
      cond->wait(scoped);
196
 
    }
 
414
    int rc= (can_deadlock ?
 
415
             pthread_cond_timedwait(cond, &data->lock->mutex,
 
416
                                    &wait_timeout) :
 
417
             pthread_cond_wait(cond, &data->lock->mutex));
197
418
    /*
198
419
      We must break the wait if one of the following occurs:
199
420
      - the connection has been aborted (!thread_var->abort), but
207
428
      Order of checks below is important to not report about timeout
208
429
      if the predicate is true.
209
430
    */
210
 
    if (data->cond == NULL)
211
 
    {
212
 
      scoped.release();
213
 
      break;
214
 
    }
215
 
    scoped.release();
 
431
    if (data->cond == 0)
 
432
    {
 
433
      break;
 
434
    }
 
435
    if (rc == ETIMEDOUT || rc == ETIME)
 
436
    {
 
437
      /* purecov: begin inspected */
 
438
      result= THR_LOCK_WAIT_TIMEOUT;
 
439
      break;
 
440
      /* purecov: end */
 
441
    }
216
442
  }
217
443
  if (data->cond || data->type == TL_UNLOCK)
218
444
  {
223
449
      else
224
450
        wait->last=data->prev;
225
451
      data->type= TL_UNLOCK;                    /* No lock */
 
452
      check_locks(data->lock, "killed or timed out wait_for_lock", 1);
226
453
      wake_up_waiters(data->lock);
227
454
    }
 
455
    else
 
456
    {
 
457
      check_locks(data->lock, "aborted wait_for_lock", 0);
 
458
    }
228
459
  }
229
460
  else
230
461
  {
231
462
    result= THR_LOCK_SUCCESS;
 
463
    if (data->lock->get_status)
 
464
      (*data->lock->get_status)(data->status_param, 0);
 
465
    check_locks(data->lock,"got wait_for_lock",0);
232
466
  }
233
 
  data->lock->unlock();
 
467
  pthread_mutex_unlock(&data->lock->mutex);
234
468
 
235
469
  /* The following must be done after unlock of lock->mutex */
236
 
  boost_unique_lock_t scopedLock(thread_var->mutex);
237
 
  thread_var->current_mutex= NULL;
238
 
  thread_var->current_cond= NULL;
 
470
  pthread_mutex_lock(&thread_var->mutex);
 
471
  thread_var->current_mutex= 0;
 
472
  thread_var->current_cond=  0;
 
473
  pthread_mutex_unlock(&thread_var->mutex);
239
474
  return(result);
240
475
}
241
476
 
242
477
 
243
 
static enum enum_thr_lock_result thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
 
478
enum enum_thr_lock_result
 
479
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
 
480
         enum thr_lock_type lock_type)
244
481
{
245
 
  THR_LOCK *lock= data->lock;
 
482
  THR_LOCK *lock=data->lock;
246
483
  enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
247
484
  struct st_lock_list *wait_queue;
248
485
  THR_LOCK_DATA *lock_owner;
251
488
  data->cond=0;                                 /* safety */
252
489
  data->type=lock_type;
253
490
  data->owner= owner;                           /* Must be reset ! */
254
 
  lock->lock();
 
491
  pthread_mutex_lock(&lock->mutex);
 
492
  check_locks(lock,(uint) lock_type <= (uint) TL_READ_NO_INSERT ?
 
493
              "enter read_lock" : "enter write_lock",0);
255
494
  if ((int) lock_type <= (int) TL_READ_NO_INSERT)
256
495
  {
257
496
    /* Request for READ lock */
267
506
      */
268
507
 
269
508
      if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
270
 
          (lock->write.data->type <= TL_WRITE_CONCURRENT_INSERT &&
271
 
           (((int) lock_type <= (int) TL_READ_WITH_SHARED_LOCKS) ||
 
509
          (lock->write.data->type <= TL_WRITE_DELAYED &&
 
510
           (((int) lock_type <= (int) TL_READ_HIGH_PRIORITY) ||
272
511
            (lock->write.data->type != TL_WRITE_CONCURRENT_INSERT &&
273
512
             lock->write.data->type != TL_WRITE_ALLOW_READ))))
274
513
      {                                         /* Already got a write lock */
277
516
        lock->read.last= &data->next;
278
517
        if (lock_type == TL_READ_NO_INSERT)
279
518
          lock->read_no_write_count++;
280
 
        current_global_counters.locks_immediate++;
 
519
        check_locks(lock,"read lock with old write lock",0);
 
520
        if (lock->get_status)
 
521
          (*lock->get_status)(data->status_param, 0);
 
522
        statistic_increment(locks_immediate,&THR_LOCK_lock);
281
523
        goto end;
282
524
      }
283
525
      if (lock->write.data->type == TL_WRITE_ONLY)
289
531
      }
290
532
    }
291
533
    else if (!lock->write_wait.data ||
292
 
             lock->write_wait.data->type <= TL_WRITE_DEFAULT ||
 
534
             lock->write_wait.data->type <= TL_WRITE_LOW_PRIORITY ||
 
535
             lock_type == TL_READ_HIGH_PRIORITY ||
293
536
             have_old_read_lock(lock->read.data, data->owner))
294
537
    {                                           /* No important write-locks */
295
538
      (*lock->read.last)=data;                  /* Add to running FIFO */
296
539
      data->prev=lock->read.last;
297
540
      lock->read.last= &data->next;
 
541
      if (lock->get_status)
 
542
        (*lock->get_status)(data->status_param, 0);
298
543
      if (lock_type == TL_READ_NO_INSERT)
299
544
        lock->read_no_write_count++;
300
 
      current_global_counters.locks_immediate++;
 
545
      check_locks(lock,"read lock with no write locks",0);
 
546
      statistic_increment(locks_immediate,&THR_LOCK_lock);
301
547
      goto end;
302
548
    }
303
549
    /*
309
555
  }
310
556
  else                                          /* Request for WRITE lock */
311
557
  {
312
 
    if (lock_type == TL_WRITE_CONCURRENT_INSERT)
 
558
    if (lock_type == TL_WRITE_DELAYED)
 
559
    {
 
560
      if (lock->write.data && lock->write.data->type == TL_WRITE_ONLY)
 
561
      {
 
562
        data->type=TL_UNLOCK;
 
563
        result= THR_LOCK_ABORTED;               /* Can't wait for this one */
 
564
        goto end;
 
565
      }
 
566
      /*
 
567
        if there is a TL_WRITE_ALLOW_READ lock, we have to wait for a lock
 
568
        (TL_WRITE_ALLOW_READ is used for ALTER TABLE in MySQL)
 
569
      */
 
570
      if ((!lock->write.data ||
 
571
           lock->write.data->type != TL_WRITE_ALLOW_READ) &&
 
572
          !have_specific_lock(lock->write_wait.data,TL_WRITE_ALLOW_READ) &&
 
573
          (lock->write.data || lock->read.data))
 
574
      {
 
575
        /* Add delayed write lock to write_wait queue, and return at once */
 
576
        (*lock->write_wait.last)=data;
 
577
        data->prev=lock->write_wait.last;
 
578
        lock->write_wait.last= &data->next;
 
579
        data->cond=get_cond();
 
580
        /*
 
581
          We don't have to do get_status here as we will do it when we change
 
582
          the delayed lock to a real write lock
 
583
        */
 
584
        statistic_increment(locks_immediate,&THR_LOCK_lock);
 
585
        goto end;
 
586
      }
 
587
    }
 
588
    else if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
313
589
      data->type=lock_type= thr_upgraded_concurrent_insert_lock;
314
590
 
315
591
    if (lock->write.data)                       /* If there is a write lock */
344
620
        (*lock->write.last)=data;       /* Add to running fifo */
345
621
        data->prev=lock->write.last;
346
622
        lock->write.last= &data->next;
347
 
        current_global_counters.locks_immediate++;
 
623
        check_locks(lock,"second write lock",0);
 
624
        if (data->lock->get_status)
 
625
          (*data->lock->get_status)(data->status_param, 0);
 
626
        statistic_increment(locks_immediate,&THR_LOCK_lock);
348
627
        goto end;
349
628
      }
350
629
    }
356
635
        if (lock_type == TL_WRITE_CONCURRENT_INSERT)
357
636
        {
358
637
          concurrent_insert= 1;
 
638
          if ((*lock->check_status)(data->status_param))
 
639
          {
 
640
            concurrent_insert= 0;
 
641
            data->type=lock_type= thr_upgraded_concurrent_insert_lock;
 
642
          }
359
643
        }
360
644
 
361
645
        if (!lock->read.data ||
362
 
            (lock_type <= TL_WRITE_CONCURRENT_INSERT &&
 
646
            (lock_type <= TL_WRITE_DELAYED &&
363
647
             ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
364
648
               lock_type != TL_WRITE_ALLOW_WRITE) ||
365
649
              !lock->read_no_write_count)))
367
651
          (*lock->write.last)=data;             /* Add as current write lock */
368
652
          data->prev=lock->write.last;
369
653
          lock->write.last= &data->next;
370
 
          current_global_counters.locks_immediate++;
 
654
          if (data->lock->get_status)
 
655
            (*data->lock->get_status)(data->status_param, concurrent_insert);
 
656
          check_locks(lock,"only write lock",0);
 
657
          statistic_increment(locks_immediate,&THR_LOCK_lock);
371
658
          goto end;
372
659
        }
373
660
      }
386
673
    result= THR_LOCK_DEADLOCK;
387
674
    goto end;
388
675
  }
389
 
 
390
676
  /* Can't get lock yet;  Wait for it */
391
677
  return(wait_for_lock(wait_queue, data, 0));
392
678
end:
393
 
  lock->unlock();
394
 
 
 
679
  pthread_mutex_unlock(&lock->mutex);
395
680
  return(result);
396
681
}
397
682
 
398
683
 
399
 
static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
 
684
static inline void free_all_read_locks(THR_LOCK *lock,
 
685
                                       bool using_concurrent_insert)
400
686
{
401
 
  THR_LOCK_DATA *data= lock->read_wait.data;
 
687
  THR_LOCK_DATA *data=lock->read_wait.data;
 
688
 
 
689
  check_locks(lock,"before freeing read locks",1);
402
690
 
403
691
  /* move all locks from read_wait list to read list */
404
692
  (*lock->read.last)=data;
410
698
 
411
699
  do
412
700
  {
413
 
    boost::condition_variable_any *cond= data->cond;
 
701
    pthread_cond_t *cond=data->cond;
414
702
    if ((int) data->type == (int) TL_READ_NO_INSERT)
415
703
    {
416
704
      if (using_concurrent_insert)
417
705
      {
418
706
        /*
419
 
          We can't free this lock;
 
707
          We can't free this lock; 
420
708
          Link lock away from read chain back into read_wait chain
421
709
        */
422
710
        if (((*data->prev)=data->next))
429
717
        continue;
430
718
      }
431
719
      lock->read_no_write_count++;
432
 
    }
433
 
    data->cond= NULL;                           /* Mark thread free */
434
 
    cond->notify_one();
 
720
    }      
 
721
    data->cond=0;                               /* Mark thread free */
 
722
    pthread_cond_signal(cond);
435
723
  } while ((data=data->next));
436
724
  *lock->read_wait.last=0;
437
725
  if (!lock->read_wait.data)
438
726
    lock->write_lock_count=0;
 
727
  check_locks(lock,"after giving read locks",0);
439
728
}
440
729
 
441
 
/* Unlock lock and free next thread on same lock */
 
730
        /* Unlock lock and free next thread on same lock */
442
731
 
443
 
static void thr_unlock(THR_LOCK_DATA *data)
 
732
void thr_unlock(THR_LOCK_DATA *data)
444
733
{
445
734
  THR_LOCK *lock=data->lock;
446
735
  enum thr_lock_type lock_type=data->type;
447
 
  lock->lock();
 
736
  pthread_mutex_lock(&lock->mutex);
 
737
  check_locks(lock,"start of release lock",0);
448
738
 
449
739
  if (((*data->prev)=data->next))               /* remove from lock-list */
450
740
    data->next->prev= data->prev;
451
741
  else if (lock_type <= TL_READ_NO_INSERT)
452
742
    lock->read.last=data->prev;
 
743
  else if (lock_type == TL_WRITE_DELAYED && data->cond)
 
744
  {
 
745
    /*
 
746
      This only happens in extreme circumstances when a 
 
747
      write delayed lock that is waiting for a lock
 
748
    */
 
749
    lock->write_wait.last=data->prev;           /* Put it on wait queue */
 
750
  }
453
751
  else
454
752
    lock->write.last=data->prev;
455
753
  if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
456
 
  { }
 
754
  {
 
755
    if (lock->update_status)
 
756
      (*lock->update_status)(data->status_param);
 
757
  }
457
758
  else
458
 
  { }
 
759
  {
 
760
    if (lock->restore_status)
 
761
      (*lock->restore_status)(data->status_param);
 
762
  }
459
763
  if (lock_type == TL_READ_NO_INSERT)
460
764
    lock->read_no_write_count--;
461
765
  data->type=TL_UNLOCK;                         /* Mark unlocked */
 
766
  check_locks(lock,"after releasing lock",1);
462
767
  wake_up_waiters(lock);
463
 
  lock->unlock();
 
768
  pthread_mutex_unlock(&lock->mutex);
 
769
  return;
464
770
}
465
771
 
466
772
 
484
790
    {
485
791
      /* Release write-locks with TL_WRITE or TL_WRITE_ONLY priority first */
486
792
      if (data &&
487
 
          (!lock->read_wait.data || lock->read_wait.data->type <= TL_READ_WITH_SHARED_LOCKS))
 
793
          (data->type != TL_WRITE_LOW_PRIORITY || !lock->read_wait.data ||
 
794
           lock->read_wait.data->type < TL_READ_HIGH_PRIORITY))
488
795
      {
489
796
        if (lock->write_lock_count++ > max_write_lock_count)
490
797
        {
506
813
          data->prev=lock->write.last;
507
814
          data->next=0;
508
815
          lock->write.last= &data->next;
509
 
 
 
816
          if (data->type == TL_WRITE_CONCURRENT_INSERT &&
 
817
              (*lock->check_status)(data->status_param))
 
818
            data->type=TL_WRITE;                        /* Upgrade lock */
510
819
          {
511
 
            boost::condition_variable_any *cond= data->cond;
512
 
            data->cond= NULL;                           /* Mark thread free */
513
 
            cond->notify_one(); /* Start waiting thred */
 
820
            pthread_cond_t *cond=data->cond;
 
821
            data->cond=0;                               /* Mark thread free */
 
822
            pthread_cond_signal(cond);  /* Start waiting thread */
514
823
          }
515
824
          if (data->type != TL_WRITE_ALLOW_WRITE ||
516
825
              !lock->write_wait.data ||
518
827
            break;
519
828
          data=lock->write_wait.data;           /* Free this too */
520
829
        }
521
 
        if (data->type >= TL_WRITE)
 
830
        if (data->type >= TL_WRITE_LOW_PRIORITY)
522
831
          goto end;
523
832
        /* Release possible read locks together with the write lock */
524
833
      }
529
838
                             data->type == TL_WRITE_ALLOW_WRITE));
530
839
    }
531
840
    else if (data &&
532
 
             (lock_type=data->type) <= TL_WRITE_CONCURRENT_INSERT &&
 
841
             (lock_type=data->type) <= TL_WRITE_DELAYED &&
533
842
             ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
534
843
               lock_type != TL_WRITE_ALLOW_WRITE) ||
535
844
              !lock->read_no_write_count))
536
845
    {
 
846
      /*
 
847
        For DELAYED, ALLOW_READ, WRITE_ALLOW_WRITE or CONCURRENT_INSERT locks
 
848
        start WRITE locks together with the READ locks
 
849
      */
 
850
      if (lock_type == TL_WRITE_CONCURRENT_INSERT &&
 
851
          (*lock->check_status)(data->status_param))
 
852
      {
 
853
        data->type=TL_WRITE;                    /* Upgrade lock */
 
854
        if (lock->read_wait.data)
 
855
          free_all_read_locks(lock,0);
 
856
        goto end;
 
857
      }
537
858
      do {
538
 
        boost::condition_variable_any *cond= data->cond;
 
859
        pthread_cond_t *cond=data->cond;
539
860
        if (((*data->prev)=data->next))         /* remove from wait-list */
540
861
          data->next->prev= data->prev;
541
862
        else
544
865
        data->prev=lock->write.last;
545
866
        lock->write.last= &data->next;
546
867
        data->next=0;                           /* Only one write lock */
547
 
        data->cond= NULL;                               /* Mark thread free */
548
 
        cond->notify_one(); /* Start waiting thread */
 
868
        data->cond=0;                           /* Mark thread free */
 
869
        pthread_cond_signal(cond);      /* Start waiting thread */
549
870
      } while (lock_type == TL_WRITE_ALLOW_WRITE &&
550
871
               (data=lock->write_wait.data) &&
551
872
               data->type == TL_WRITE_ALLOW_WRITE);
555
876
                             lock_type == TL_WRITE_ALLOW_WRITE));
556
877
    }
557
878
    else if (!data && lock->read_wait.data)
558
 
    {
559
879
      free_all_read_locks(lock,0);
560
 
    }
561
880
  }
562
881
end:
 
882
  check_locks(lock, "after waking up waiters", 0);
563
883
  return;
564
884
}
565
885
 
571
891
*/
572
892
 
573
893
 
574
 
#define LOCK_CMP(A,B) ((unsigned char*) (A->lock) - (uint32_t) ((A)->type) < (unsigned char*) (B->lock)- (uint32_t) ((B)->type))
 
894
#define LOCK_CMP(A,B) ((unsigned char*) (A->lock) - (uint) ((A)->type) < (unsigned char*) (B->lock)- (uint) ((B)->type))
575
895
 
576
896
static void sort_locks(THR_LOCK_DATA **data,uint32_t count)
577
897
{
606
926
    enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
607
927
    if (result != THR_LOCK_SUCCESS)
608
928
    {                                           /* Aborted */
609
 
      thr_multi_unlock(data,(uint32_t) (pos-data));
 
929
      thr_multi_unlock(data,(uint) (pos-data));
610
930
      return(result);
611
931
    }
 
932
#ifdef MAIN
 
933
    printf("Thread: %s  Got lock: 0x%lx  type: %d\n",my_thread_name(),
 
934
           (long) pos[0]->lock, pos[0]->type); fflush(stdout);
 
935
#endif
612
936
  }
613
937
  /*
614
938
    Ensure that all get_locks() have the same status
623
947
    do
624
948
    {
625
949
      pos--;
626
 
      last_lock=(*pos);
 
950
      if (last_lock->lock == (*pos)->lock &&
 
951
          last_lock->lock->copy_status)
 
952
      {
 
953
        if (last_lock->type <= TL_READ_NO_INSERT)
 
954
        {
 
955
          THR_LOCK_DATA **read_lock;
 
956
          /*
 
957
            If we are locking the same table with read locks we must ensure
 
958
            that all tables share the status of the last write lock or
 
959
            the same read lock.
 
960
          */
 
961
          for (;
 
962
               (*pos)->type <= TL_READ_NO_INSERT &&
 
963
                 pos != data &&
 
964
                 pos[-1]->lock == (*pos)->lock ;
 
965
               pos--) ;
 
966
 
 
967
          read_lock = pos+1;
 
968
          do
 
969
          {
 
970
            (last_lock->lock->copy_status)((*read_lock)->status_param,
 
971
                                           (*pos)->status_param);
 
972
          } while (*(read_lock++) != last_lock);
 
973
          last_lock= (*pos);                    /* Point at last write lock */
 
974
        }
 
975
        else
 
976
          (*last_lock->lock->copy_status)((*pos)->status_param,
 
977
                                          last_lock->status_param);
 
978
      }
 
979
      else
 
980
        last_lock=(*pos);
627
981
    } while (pos != data);
628
982
  }
629
983
#endif
632
986
 
633
987
  /* free all locks */
634
988
 
635
 
static void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count)
 
989
void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count)
636
990
{
637
991
  THR_LOCK_DATA **pos,**end;
638
992
 
639
993
  for (pos=data,end=data+count; pos < end ; pos++)
640
994
  {
 
995
#ifdef MAIN
 
996
    printf("Thread: %s  Rel lock: 0x%lx  type: %d\n",
 
997
           my_thread_name(), (long) pos[0]->lock, pos[0]->type);
 
998
    fflush(stdout);
 
999
#endif
641
1000
    if ((*pos)->type != TL_UNLOCK)
642
1001
      thr_unlock(*pos);
643
1002
  }
644
1003
  return;
645
1004
}
646
1005
 
647
 
void DrizzleLock::unlock(uint32_t count)
648
 
{
649
 
  THR_LOCK_DATA **pos,**end;
650
 
 
651
 
  for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
652
 
  {
653
 
    if ((*pos)->type != TL_UNLOCK)
654
 
      thr_unlock(*pos);
655
 
  }
656
 
}
657
 
 
658
1006
/*
659
1007
  Abort all threads waiting for a lock. The lock will be upgraded to
660
1008
  TL_WRITE_ONLY to abort any new accesses to the lock
661
1009
*/
662
1010
 
663
 
void THR_LOCK::abort_locks()
 
1011
void thr_abort_locks(THR_LOCK *lock, bool upgrade_lock)
664
1012
{
665
 
  boost_unique_lock_t scopedLock(mutex);
 
1013
  THR_LOCK_DATA *data;
 
1014
  pthread_mutex_lock(&lock->mutex);
666
1015
 
667
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
1016
  for (data=lock->read_wait.data; data ; data=data->next)
668
1017
  {
669
 
    local_data->type= TL_UNLOCK;                        /* Mark killed */
 
1018
    data->type=TL_UNLOCK;                       /* Mark killed */
670
1019
    /* It's safe to signal the cond first: we're still holding the mutex. */
671
 
    local_data->cond->notify_one();
672
 
    local_data->cond= NULL;                             /* Removed from list */
 
1020
    pthread_cond_signal(data->cond);
 
1021
    data->cond=0;                               /* Removed from list */
673
1022
  }
674
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
1023
  for (data=lock->write_wait.data; data ; data=data->next)
675
1024
  {
676
 
    local_data->type= TL_UNLOCK;
677
 
    local_data->cond->notify_one();
678
 
    local_data->cond= NULL;
 
1025
    data->type=TL_UNLOCK;
 
1026
    pthread_cond_signal(data->cond);
 
1027
    data->cond=0;
679
1028
  }
680
 
  read_wait.last= &read_wait.data;
681
 
  write_wait.last= &write_wait.data;
682
 
  read_wait.data= write_wait.data=0;
683
 
  if (write.data)
684
 
    write.data->type=TL_WRITE_ONLY;
 
1029
  lock->read_wait.last= &lock->read_wait.data;
 
1030
  lock->write_wait.last= &lock->write_wait.data;
 
1031
  lock->read_wait.data=lock->write_wait.data=0;
 
1032
  if (upgrade_lock && lock->write.data)
 
1033
    lock->write.data->type=TL_WRITE_ONLY;
 
1034
  pthread_mutex_unlock(&lock->mutex);
 
1035
  return;
685
1036
}
686
1037
 
687
1038
 
691
1042
  This is used to abort all locks for a specific thread
692
1043
*/
693
1044
 
694
 
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
 
1045
bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread_id)
695
1046
{
 
1047
  THR_LOCK_DATA *data;
696
1048
  bool found= false;
697
1049
 
698
 
  boost_unique_lock_t scopedLock(mutex);
699
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
1050
  pthread_mutex_lock(&lock->mutex);
 
1051
  for (data= lock->read_wait.data; data ; data= data->next)
700
1052
  {
701
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
1053
    if (data->owner->info->thread_id == thread_id)    /* purecov: tested */
702
1054
    {
703
 
      local_data->type= TL_UNLOCK;                      /* Mark killed */
 
1055
      data->type= TL_UNLOCK;                    /* Mark killed */
704
1056
      /* It's safe to signal the cond first: we're still holding the mutex. */
705
1057
      found= true;
706
 
      local_data->cond->notify_one();
707
 
      local_data->cond= 0;                              /* Removed from list */
 
1058
      pthread_cond_signal(data->cond);
 
1059
      data->cond= 0;                            /* Removed from list */
708
1060
 
709
 
      if (((*local_data->prev)= local_data->next))
710
 
        local_data->next->prev= local_data->prev;
 
1061
      if (((*data->prev)= data->next))
 
1062
        data->next->prev= data->prev;
711
1063
      else
712
 
        read_wait.last= local_data->prev;
 
1064
        lock->read_wait.last= data->prev;
713
1065
    }
714
1066
  }
715
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
1067
  for (data= lock->write_wait.data; data ; data= data->next)
716
1068
  {
717
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
1069
    if (data->owner->info->thread_id == thread_id) /* purecov: tested */
718
1070
    {
719
 
      local_data->type= TL_UNLOCK;
 
1071
      data->type= TL_UNLOCK;
720
1072
      found= true;
721
 
      local_data->cond->notify_one();
722
 
      local_data->cond= NULL;
723
 
 
724
 
      if (((*local_data->prev)= local_data->next))
725
 
        local_data->next->prev= local_data->prev;
726
 
      else
727
 
        write_wait.last= local_data->prev;
728
 
    }
729
 
  }
730
 
  wake_up_waiters(this);
731
 
 
732
 
  return found;
733
 
}
734
 
 
735
 
} /* namespace drizzled */
 
1073
      pthread_cond_signal(data->cond);
 
1074
      data->cond= 0;
 
1075
 
 
1076
      if (((*data->prev)= data->next))
 
1077
        data->next->prev= data->prev;
 
1078
      else
 
1079
        lock->write_wait.last= data->prev;
 
1080
    }
 
1081
  }
 
1082
  wake_up_waiters(lock);
 
1083
  pthread_mutex_unlock(&lock->mutex);
 
1084
  return(found);
 
1085
}
 
1086
 
 
1087
 
 
1088
/*
 
1089
  Downgrade a WRITE_* to a lower WRITE level
 
1090
  SYNOPSIS
 
1091
    thr_downgrade_write_lock()
 
1092
    in_data                   Lock data of thread downgrading its lock
 
1093
    new_lock_type             New write lock type
 
1094
  RETURN VALUE
 
1095
    NONE
 
1096
  DESCRIPTION
 
1097
    This can be used to downgrade a lock already owned. When the downgrade
 
1098
    occurs also other waiters, both readers and writers can be allowed to
 
1099
    start.
 
1100
    The previous lock is often TL_WRITE_ONLY but can also be
 
1101
    TL_WRITE and TL_WRITE_ALLOW_READ. The normal downgrade variants are
 
1102
    TL_WRITE_ONLY => TL_WRITE_ALLOW_READ After a short exclusive lock
 
1103
    TL_WRITE_ALLOW_READ => TL_WRITE_ALLOW_WRITE After discovering that the
 
1104
    operation didn't need such a high lock.
 
1105
    TL_WRITE_ONLY => TL_WRITE after a short exclusive lock while holding a
 
1106
    write table lock
 
1107
    TL_WRITE_ONLY => TL_WRITE_ALLOW_WRITE After a short exclusive lock after
 
1108
    already earlier having dongraded lock to TL_WRITE_ALLOW_WRITE
 
1109
    The implementation is conservative and rather don't start rather than
 
1110
    go on unknown paths to start, the common cases are handled.
 
1111
 
 
1112
    NOTE:
 
1113
    In its current implementation it is only allowed to downgrade from
 
1114
    TL_WRITE_ONLY. In this case there are no waiters. Thus no wake up
 
1115
    logic is required.
 
1116
*/
 
1117
 
 
1118
void thr_downgrade_write_lock(THR_LOCK_DATA *in_data,
 
1119
                              enum thr_lock_type new_lock_type)
 
1120
{
 
1121
  THR_LOCK *lock=in_data->lock;
 
1122
 
 
1123
  pthread_mutex_lock(&lock->mutex);
 
1124
  in_data->type= new_lock_type;
 
1125
  check_locks(lock,"after downgrading lock",0);
 
1126
 
 
1127
  pthread_mutex_unlock(&lock->mutex);
 
1128
  return;
 
1129
}
 
1130
 
 
1131
/* Upgrade a WRITE_DELAY lock to a WRITE_LOCK */
 
1132
 
 
1133
bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data)
 
1134
{
 
1135
  THR_LOCK *lock=data->lock;
 
1136
 
 
1137
  pthread_mutex_lock(&lock->mutex);
 
1138
  if (data->type == TL_UNLOCK || data->type >= TL_WRITE_LOW_PRIORITY)
 
1139
  {
 
1140
    pthread_mutex_unlock(&lock->mutex);
 
1141
    return(data->type == TL_UNLOCK);    /* Test if Aborted */
 
1142
  }
 
1143
  check_locks(lock,"before upgrading lock",0);
 
1144
  /* TODO:  Upgrade to TL_WRITE_CONCURRENT_INSERT in some cases */
 
1145
  data->type=TL_WRITE;                          /* Upgrade lock */
 
1146
 
 
1147
  /* Check if someone has given us the lock */
 
1148
  if (!data->cond)
 
1149
  {
 
1150
    if (!lock->read.data)                       /* No read locks */
 
1151
    {                                           /* We have the lock */
 
1152
      if (data->lock->get_status)
 
1153
        (*data->lock->get_status)(data->status_param, 0);
 
1154
      pthread_mutex_unlock(&lock->mutex);
 
1155
      return(0);
 
1156
    }
 
1157
 
 
1158
    if (((*data->prev)=data->next))             /* remove from lock-list */
 
1159
      data->next->prev= data->prev;
 
1160
    else
 
1161
      lock->write.last=data->prev;
 
1162
 
 
1163
    if ((data->next=lock->write_wait.data))     /* Put first in lock_list */
 
1164
      data->next->prev= &data->next;
 
1165
    else
 
1166
      lock->write_wait.last= &data->next;
 
1167
    data->prev= &lock->write_wait.data;
 
1168
    lock->write_wait.data=data;
 
1169
    check_locks(lock,"upgrading lock",0);
 
1170
  }
 
1171
  else
 
1172
  {
 
1173
    check_locks(lock,"waiting for lock",0);
 
1174
  }
 
1175
  return(wait_for_lock(&lock->write_wait,data,1));
 
1176
}
 
1177
 
 
1178
 
 
1179
/* downgrade a WRITE lock to a WRITE_DELAY lock if there is pending locks */
 
1180
 
 
1181
bool thr_reschedule_write_lock(THR_LOCK_DATA *data)
 
1182
{
 
1183
  THR_LOCK *lock=data->lock;
 
1184
 
 
1185
  pthread_mutex_lock(&lock->mutex);
 
1186
  if (!lock->read_wait.data)                    /* No waiting read locks */
 
1187
  {
 
1188
    pthread_mutex_unlock(&lock->mutex);
 
1189
    return(0);
 
1190
  }
 
1191
 
 
1192
  data->type=TL_WRITE_DELAYED;
 
1193
  if (lock->update_status)
 
1194
    (*lock->update_status)(data->status_param);
 
1195
  if (((*data->prev)=data->next))               /* remove from lock-list */
 
1196
    data->next->prev= data->prev;
 
1197
  else
 
1198
    lock->write.last=data->prev;
 
1199
 
 
1200
  if ((data->next=lock->write_wait.data))       /* Put first in lock_list */
 
1201
    data->next->prev= &data->next;
 
1202
  else
 
1203
    lock->write_wait.last= &data->next;
 
1204
  data->prev= &lock->write_wait.data;
 
1205
  data->cond=get_cond();                        /* This was zero */
 
1206
  lock->write_wait.data=data;
 
1207
  free_all_read_locks(lock,0);
 
1208
 
 
1209
  pthread_mutex_unlock(&lock->mutex);
 
1210
  return(thr_upgrade_write_delay_lock(data));
 
1211
}
 
1212
 
 
1213
 
 
1214
#include <my_sys.h>
 
1215
 
 
1216
/*****************************************************************************
 
1217
** Test of thread locks
 
1218
****************************************************************************/
 
1219
 
 
1220
#ifdef MAIN
 
1221
 
 
1222
struct st_test {
 
1223
  uint32_t lock_nr;
 
1224
  enum thr_lock_type lock_type;
 
1225
};
 
1226
 
 
1227
THR_LOCK locks[5];                      /* 4 locks */
 
1228
 
 
1229
struct st_test test_0[] = {{0,TL_READ}};        /* One lock */
 
1230
struct st_test test_1[] = {{0,TL_READ},{0,TL_WRITE}}; /* Read and write lock of lock 0 */
 
1231
struct st_test test_2[] = {{1,TL_WRITE},{0,TL_READ},{2,TL_READ}};
 
1232
struct st_test test_3[] = {{2,TL_WRITE},{1,TL_READ},{0,TL_READ}}; /* Deadlock with test_2 ? */
 
1233
struct st_test test_4[] = {{0,TL_WRITE},{0,TL_READ},{0,TL_WRITE},{0,TL_READ}};
 
1234
struct st_test test_5[] = {{0,TL_READ},{1,TL_READ},{2,TL_READ},{3,TL_READ}}; /* Many reads */
 
1235
struct st_test test_6[] = {{0,TL_WRITE},{1,TL_WRITE},{2,TL_WRITE},{3,TL_WRITE}}; /* Many writes */
 
1236
struct st_test test_7[] = {{3,TL_READ}};
 
1237
struct st_test test_8[] = {{1,TL_READ_NO_INSERT},{2,TL_READ_NO_INSERT},{3,TL_READ_NO_INSERT}};  /* Should be quick */
 
1238
struct st_test test_9[] = {{4,TL_READ_HIGH_PRIORITY}};
 
1239
struct st_test test_10[] ={{4,TL_WRITE}};
 
1240
struct st_test test_11[] = {{0,TL_WRITE_LOW_PRIORITY},{1,TL_WRITE_LOW_PRIORITY},{2,TL_WRITE_LOW_PRIORITY},{3,TL_WRITE_LOW_PRIORITY}}; /* Many writes */
 
1241
struct st_test test_12[] = {{0,TL_WRITE_ALLOW_READ},{1,TL_WRITE_ALLOW_READ},{2,TL_WRITE_ALLOW_READ},{3,TL_WRITE_ALLOW_READ}}; /* Many writes */
 
1242
struct st_test test_13[] = {{0,TL_WRITE_CONCURRENT_INSERT},{1,TL_WRITE_CONCURRENT_INSERT},{2,TL_WRITE_CONCURRENT_INSERT},{3,TL_WRITE_CONCURRENT_INSERT}};
 
1243
struct st_test test_14[] = {{0,TL_WRITE_CONCURRENT_INSERT},{1,TL_READ}};
 
1244
struct st_test test_15[] = {{0,TL_WRITE_ALLOW_WRITE},{1,TL_READ}};
 
1245
struct st_test test_16[] = {{0,TL_WRITE_ALLOW_WRITE},{1,TL_WRITE_ALLOW_WRITE}};
 
1246
 
 
1247
struct st_test *tests[] = {test_0,test_1,test_2,test_3,test_4,test_5,test_6,
 
1248
                           test_7,test_8,test_9,test_10,test_11,test_12,
 
1249
                           test_13,test_14,test_15,test_16};
 
1250
int lock_counts[]= {sizeof(test_0)/sizeof(struct st_test),
 
1251
                    sizeof(test_1)/sizeof(struct st_test),
 
1252
                    sizeof(test_2)/sizeof(struct st_test),
 
1253
                    sizeof(test_3)/sizeof(struct st_test),
 
1254
                    sizeof(test_4)/sizeof(struct st_test),
 
1255
                    sizeof(test_5)/sizeof(struct st_test),
 
1256
                    sizeof(test_6)/sizeof(struct st_test),
 
1257
                    sizeof(test_7)/sizeof(struct st_test),
 
1258
                    sizeof(test_8)/sizeof(struct st_test),
 
1259
                    sizeof(test_9)/sizeof(struct st_test),
 
1260
                    sizeof(test_10)/sizeof(struct st_test),
 
1261
                    sizeof(test_11)/sizeof(struct st_test),
 
1262
                    sizeof(test_12)/sizeof(struct st_test),
 
1263
                    sizeof(test_13)/sizeof(struct st_test),
 
1264
                    sizeof(test_14)/sizeof(struct st_test),
 
1265
                    sizeof(test_15)/sizeof(struct st_test),
 
1266
                    sizeof(test_16)/sizeof(struct st_test)
 
1267
};
 
1268
 
 
1269
 
 
1270
static pthread_cond_t COND_thread_count;
 
1271
static pthread_mutex_t LOCK_thread_count;
 
1272
static uint32_t thread_count;
 
1273
static uint32_t sum=0;
 
1274
 
 
1275
#define MAX_LOCK_COUNT 8
 
1276
 
 
1277
/* The following functions is for WRITE_CONCURRENT_INSERT */
 
1278
 
 
1279
static void test_get_status(void* param __attribute__((unused)),
 
1280
                            int concurrent_insert __attribute__((unused)))
 
1281
{
 
1282
}
 
1283
 
 
1284
static void test_update_status(void* param __attribute__((unused)))
 
1285
{
 
1286
}
 
1287
 
 
1288
static void test_copy_status(void* to __attribute__((unused)) ,
 
1289
                             void *from __attribute__((unused)))
 
1290
{
 
1291
}
 
1292
 
 
1293
static bool test_check_status(void* param __attribute__((unused)))
 
1294
{
 
1295
  return 0;
 
1296
}
 
1297
 
 
1298
 
 
1299
static void *test_thread(void *arg)
 
1300
{
 
1301
  int i,j,param=*((int*) arg);
 
1302
  THR_LOCK_DATA data[MAX_LOCK_COUNT];
 
1303
  THR_LOCK_OWNER owner;
 
1304
  THR_LOCK_INFO lock_info;
 
1305
  THR_LOCK_DATA *multi_locks[MAX_LOCK_COUNT];
 
1306
  my_thread_init();
 
1307
 
 
1308
  printf("Thread %s (%d) started\n",my_thread_name(),param); fflush(stdout);
 
1309
 
 
1310
 
 
1311
  thr_lock_info_init(&lock_info);
 
1312
  thr_lock_owner_init(&owner, &lock_info);
 
1313
  for (i=0; i < lock_counts[param] ; i++)
 
1314
    thr_lock_data_init(locks+tests[param][i].lock_nr,data+i,NULL);
 
1315
  for (j=1 ; j < 10 ; j++)              /* try locking 10 times */
 
1316
  {
 
1317
    for (i=0; i < lock_counts[param] ; i++)
 
1318
    {                                   /* Init multi locks */
 
1319
      multi_locks[i]= &data[i];
 
1320
      data[i].type= tests[param][i].lock_type;
 
1321
    }
 
1322
    thr_multi_lock(multi_locks, lock_counts[param], &owner);
 
1323
    pthread_mutex_lock(&LOCK_thread_count);
 
1324
    {
 
1325
      int tmp=rand() & 7;                       /* Do something from 0-2 sec */
 
1326
      if (tmp == 0)
 
1327
        sleep(1);
 
1328
      else if (tmp == 1)
 
1329
        sleep(2);
 
1330
      else
 
1331
      {
 
1332
        uint32_t k;
 
1333
        for (k=0 ; k < (uint32_t) (tmp-2)*100000L ; k++)
 
1334
          sum+=k;
 
1335
      }
 
1336
    }
 
1337
    pthread_mutex_unlock(&LOCK_thread_count);
 
1338
    thr_multi_unlock(multi_locks,lock_counts[param]);
 
1339
  }
 
1340
 
 
1341
  printf("Thread %s (%d) ended\n",my_thread_name(),param); fflush(stdout);
 
1342
  thr_print_locks();
 
1343
  pthread_mutex_lock(&LOCK_thread_count);
 
1344
  thread_count--;
 
1345
  pthread_cond_signal(&COND_thread_count); /* Tell main we are ready */
 
1346
  pthread_mutex_unlock(&LOCK_thread_count);
 
1347
  free((unsigned char*) arg);
 
1348
  return 0;
 
1349
}
 
1350
 
 
1351
 
 
1352
int main(int argc __attribute__((unused)),char **argv __attribute__((unused)))
 
1353
{
 
1354
  pthread_t tid;
 
1355
  pthread_attr_t thr_attr;
 
1356
  int i,*param,error;
 
1357
  MY_INIT(argv[0]);
 
1358
 
 
1359
  printf("Main thread: %s\n",my_thread_name());
 
1360
 
 
1361
  if ((error=pthread_cond_init(&COND_thread_count,NULL)))
 
1362
  {
 
1363
    fprintf(stderr,"Got error: %d from pthread_cond_init (errno: %d)",
 
1364
            error,errno);
 
1365
    exit(1);
 
1366
  }
 
1367
  if ((error=pthread_mutex_init(&LOCK_thread_count,MY_MUTEX_INIT_FAST)))
 
1368
  {
 
1369
    fprintf(stderr,"Got error: %d from pthread_cond_init (errno: %d)",
 
1370
            error,errno);
 
1371
    exit(1);
 
1372
  }
 
1373
 
 
1374
  for (i=0 ; i < (int) array_elements(locks) ; i++)
 
1375
  {
 
1376
    thr_lock_init(locks+i);
 
1377
    locks[i].check_status= test_check_status;
 
1378
    locks[i].update_status=test_update_status;
 
1379
    locks[i].copy_status=  test_copy_status;
 
1380
    locks[i].get_status=   test_get_status;
 
1381
  }
 
1382
  if ((error=pthread_attr_init(&thr_attr)))
 
1383
  {
 
1384
    fprintf(stderr,"Got error: %d from pthread_attr_init (errno: %d)",
 
1385
            error,errno);
 
1386
    exit(1);
 
1387
  }
 
1388
  if ((error=pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED)))
 
1389
  {
 
1390
    fprintf(stderr,
 
1391
            "Got error: %d from pthread_attr_setdetachstate (errno: %d)",
 
1392
            error,errno);
 
1393
    exit(1);
 
1394
  }
 
1395
#ifndef pthread_attr_setstacksize               /* void return value */
 
1396
  if ((error=pthread_attr_setstacksize(&thr_attr,65536L)))
 
1397
  {
 
1398
    fprintf(stderr,"Got error: %d from pthread_attr_setstacksize (errno: %d)",
 
1399
            error,errno);
 
1400
    exit(1);
 
1401
  }
 
1402
#endif
 
1403
#ifdef HAVE_THR_SETCONCURRENCY
 
1404
  thr_setconcurrency(2);
 
1405
#endif
 
1406
  for (i=0 ; i < (int) array_elements(lock_counts) ; i++)
 
1407
  {
 
1408
    param=(int*) malloc(sizeof(int));
 
1409
    *param=i;
 
1410
 
 
1411
    if ((error=pthread_mutex_lock(&LOCK_thread_count)))
 
1412
    {
 
1413
      fprintf(stderr,"Got error: %d from pthread_mutex_lock (errno: %d)",
 
1414
              error,errno);
 
1415
      exit(1);
 
1416
    }
 
1417
    if ((error=pthread_create(&tid,&thr_attr,test_thread,(void*) param)))
 
1418
    {
 
1419
      fprintf(stderr,"Got error: %d from pthread_create (errno: %d)\n",
 
1420
              error,errno);
 
1421
      pthread_mutex_unlock(&LOCK_thread_count);
 
1422
      exit(1);
 
1423
    }
 
1424
    thread_count++;
 
1425
    pthread_mutex_unlock(&LOCK_thread_count);
 
1426
  }
 
1427
 
 
1428
  pthread_attr_destroy(&thr_attr);
 
1429
  if ((error=pthread_mutex_lock(&LOCK_thread_count)))
 
1430
    fprintf(stderr,"Got error: %d from pthread_mutex_lock\n",error);
 
1431
  while (thread_count)
 
1432
  {
 
1433
    if ((error=pthread_cond_wait(&COND_thread_count,&LOCK_thread_count)))
 
1434
      fprintf(stderr,"Got error: %d from pthread_cond_wait\n",error);
 
1435
  }
 
1436
  if ((error=pthread_mutex_unlock(&LOCK_thread_count)))
 
1437
    fprintf(stderr,"Got error: %d from pthread_mutex_unlock\n",error);
 
1438
  for (i=0 ; i < (int) array_elements(locks) ; i++)
 
1439
    thr_lock_delete(locks+i);
 
1440
#ifdef EXTRA_DEBUG
 
1441
  if (found_errors)
 
1442
    printf("Got %d warnings\n",found_errors);
 
1443
  else
 
1444
#endif
 
1445
    printf("Test succeeded\n");
 
1446
  return 0;
 
1447
}
 
1448
 
 
1449
#endif /* MAIN */