~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/thr_lock.c

  • Committer: Monty Taylor
  • Date: 2008-10-08 01:10:45 UTC
  • mto: This revision was merged to the branch mainline in revision 491.
  • Revision ID: monty@inaugust.com-20081008011045-zqozbc81f8qhmxok
Get rid of pragma interface/pragma implementation.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
Read and write locks for Posix threads. All tread must acquire
26
26
 
27
27
TL_READ                 # Low priority read
28
28
TL_READ_WITH_SHARED_LOCKS
 
29
TL_READ_HIGH_PRIORITY   # High priority read
29
30
TL_READ_NO_INSERT       # Read without concurrent inserts
30
31
TL_WRITE_ALLOW_WRITE    # Write lock that allows other writers
31
32
TL_WRITE_ALLOW_READ     # Write lock, but allow reading
32
33
TL_WRITE_CONCURRENT_INSERT
33
34
                        # Insert that can be mixed when selects
 
35
TL_WRITE_DELAYED        # Used by delayed insert
 
36
                        # Allows lower locks to take over
 
37
TL_WRITE_LOW_PRIORITY   # Low priority write
34
38
TL_WRITE                # High priority write
35
39
TL_WRITE_ONLY           # High priority write
36
40
                        # Abort all new lock request with an error
46
50
should put a pointer to the following functions in the lock structure:
47
51
(If the pointer is zero (default), the function is not called)
48
52
 
 
53
check_status:
 
54
         Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
 
55
         we check if this function exists and returns 0.
 
56
         If not, then the lock is upgraded to TL_WRITE_LOCK
 
57
         In MyISAM this is a simple check if the insert can be done
 
58
         at the end of the datafile.
 
59
update_status:
 
60
        Before a write lock is released, this function is called.
 
61
        In MyISAM this functions updates the count and length of the datafile
 
62
get_status:
 
63
        When one gets a lock this functions is called.
 
64
        In MyISAM this stores the number of rows and size of the datafile
 
65
        for concurrent reads.
49
66
 
50
67
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
51
 
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
 
68
TL_WRITE_CONCURRENT_INSERT or one TL_WRITE_DELAYED lock at the same time as
 
69
multiple read locks.
52
70
 
53
71
*/
54
72
 
55
 
#include <config.h>
56
 
#include <drizzled/internal/my_sys.h>
57
 
#include <drizzled/internal/thread_var.h>
58
 
#include <drizzled/statistics_variables.h>
59
 
#include <drizzled/pthread_globals.h>
60
 
 
61
 
#include <drizzled/session.h>
 
73
#include "mysys_priv.h"
62
74
 
63
75
#include "thr_lock.h"
64
 
#include <drizzled/internal/m_string.h>
 
76
#include <mystrings/m_string.h>
65
77
#include <errno.h>
66
 
#include <list>
67
78
 
68
79
#if TIME_WITH_SYS_TIME
69
80
# include <sys/time.h>
74
85
# else
75
86
#  include <time.h>
76
87
# endif
77
 
#endif
78
 
 
79
 
#include <drizzled/util/test.h>
80
 
 
81
 
#include <boost/interprocess/sync/lock_options.hpp>
82
 
 
83
 
using namespace std;
84
 
 
85
 
namespace drizzled
 
88
#endif  
 
89
 
 
90
 
 
91
bool thr_lock_inited=0;
 
92
uint32_t locks_immediate = 0L, locks_waited = 0L;
 
93
ulong table_lock_wait_timeout;
 
94
enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
 
95
 
 
96
/* The following constants are only for debug output */
 
97
#define MAX_THREADS 100
 
98
#define MAX_LOCKS   100
 
99
 
 
100
 
 
101
LIST *thr_lock_thread_list;                     /* List of threads in use */
 
102
ulong max_write_lock_count= ~(ulong) 0L;
 
103
 
 
104
static inline pthread_cond_t *get_cond(void)
86
105
{
87
 
 
88
 
uint64_t table_lock_wait_timeout;
89
 
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
90
 
 
91
 
 
92
 
uint64_t max_write_lock_count= UINT64_MAX;
 
106
  return &my_thread_var->suspend;
 
107
}
93
108
 
94
109
/*
95
110
** For the future (now the thread specific cond is alloced by my_pthread.c)
96
111
*/
97
112
 
 
113
bool init_thr_lock()
 
114
{
 
115
  thr_lock_inited=1;
 
116
  return 0;
 
117
}
 
118
 
98
119
static inline bool
99
120
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
100
121
{
101
122
  return rhs == lhs;
102
123
}
103
124
 
 
125
#ifdef EXTRA_DEBUG
 
126
#define MAX_FOUND_ERRORS        10              /* Report 10 first errors */
 
127
static uint32_t found_errors=0;
 
128
 
 
129
static int check_lock(struct st_lock_list *list, const char* lock_type,
 
130
                      const char *where, bool same_owner, bool no_cond)
 
131
{
 
132
  THR_LOCK_DATA *data,**prev;
 
133
  uint32_t count=0;
 
134
  THR_LOCK_OWNER *first_owner;
 
135
 
 
136
  prev= &list->data;
 
137
  if (list->data)
 
138
  {
 
139
    enum thr_lock_type last_lock_type=list->data->type;
 
140
 
 
141
    if (same_owner && list->data)
 
142
      first_owner= list->data->owner;
 
143
    for (data=list->data; data && count++ < MAX_LOCKS ; data=data->next)
 
144
    {
 
145
      if (data->type != last_lock_type)
 
146
        last_lock_type=TL_IGNORE;
 
147
      if (data->prev != prev)
 
148
      {
 
149
        fprintf(stderr,
 
150
                "Warning: prev link %d didn't point at previous lock at %s: %s\n",
 
151
                count, lock_type, where);
 
152
        return 1;
 
153
      }
 
154
      if (same_owner &&
 
155
          !thr_lock_owner_equal(data->owner, first_owner) &&
 
156
          last_lock_type != TL_WRITE_ALLOW_WRITE)
 
157
      {
 
158
        fprintf(stderr,
 
159
                "Warning: Found locks from different threads in %s: %s\n",
 
160
                lock_type,where);
 
161
        return 1;
 
162
      }
 
163
      if (no_cond && data->cond)
 
164
      {
 
165
        fprintf(stderr,
 
166
                "Warning: Found active lock with not reset cond %s: %s\n",
 
167
                lock_type,where);
 
168
        return 1;
 
169
      }
 
170
      prev= &data->next;
 
171
    }
 
172
    if (data)
 
173
    {
 
174
      fprintf(stderr,"Warning: found too many locks at %s: %s\n",
 
175
              lock_type,where);
 
176
      return 1;
 
177
    }
 
178
  }
 
179
  if (prev != list->last)
 
180
  {
 
181
    fprintf(stderr,"Warning: last didn't point at last lock at %s: %s\n",
 
182
            lock_type, where);
 
183
    return 1;
 
184
  }
 
185
  return 0;
 
186
}
 
187
 
 
188
 
 
189
static void check_locks(THR_LOCK *lock, const char *where,
 
190
                        bool allow_no_locks)
 
191
{
 
192
  uint32_t old_found_errors=found_errors;
 
193
 
 
194
  if (found_errors < MAX_FOUND_ERRORS)
 
195
  {
 
196
    if (check_lock(&lock->write,"write",where,1,1) |
 
197
        check_lock(&lock->write_wait,"write_wait",where,0,0) |
 
198
        check_lock(&lock->read,"read",where,0,1) |
 
199
        check_lock(&lock->read_wait,"read_wait",where,0,0))
 
200
      found_errors++;
 
201
 
 
202
    if (found_errors < MAX_FOUND_ERRORS)
 
203
    {
 
204
      uint32_t count=0;
 
205
      THR_LOCK_DATA *data;
 
206
      for (data=lock->read.data ; data ; data=data->next)
 
207
      {
 
208
        if ((int) data->type == (int) TL_READ_NO_INSERT)
 
209
          count++;
 
210
        /* Protect against infinite loop. */
 
211
        assert(count <= lock->read_no_write_count);
 
212
      }
 
213
      if (count != lock->read_no_write_count)
 
214
      {
 
215
        found_errors++;
 
216
        fprintf(stderr,
 
217
                "Warning at '%s': Locks read_no_write_count was %u when it should have been %u\n", where, lock->read_no_write_count,count);
 
218
      }      
 
219
 
 
220
      if (!lock->write.data)
 
221
      {
 
222
        if (!allow_no_locks && !lock->read.data &&
 
223
            (lock->write_wait.data || lock->read_wait.data))
 
224
        {
 
225
          found_errors++;
 
226
          fprintf(stderr,
 
227
                  "Warning at '%s': No locks in use but locks are in wait queue\n",
 
228
                  where);
 
229
        }
 
230
        if (!lock->write_wait.data)
 
231
        {
 
232
          if (!allow_no_locks && lock->read_wait.data)
 
233
          {
 
234
            found_errors++;
 
235
            fprintf(stderr,
 
236
                    "Warning at '%s': No write locks and waiting read locks\n",
 
237
                    where);
 
238
          }
 
239
        }
 
240
        else
 
241
        {
 
242
          if (!allow_no_locks &&
 
243
              (((lock->write_wait.data->type == TL_WRITE_CONCURRENT_INSERT ||
 
244
                 lock->write_wait.data->type == TL_WRITE_ALLOW_WRITE) &&
 
245
                !lock->read_no_write_count) ||
 
246
               lock->write_wait.data->type == TL_WRITE_ALLOW_READ ||
 
247
               (lock->write_wait.data->type == TL_WRITE_DELAYED &&
 
248
                !lock->read.data)))
 
249
          {
 
250
            found_errors++;
 
251
            fprintf(stderr,
 
252
                    "Warning at '%s': Write lock %d waiting while no exclusive read locks\n",where,(int) lock->write_wait.data->type);
 
253
          }
 
254
        }             
 
255
      }
 
256
      else
 
257
      {                                         /* Have write lock */
 
258
        if (lock->write_wait.data)
 
259
        {
 
260
          if (!allow_no_locks && 
 
261
              lock->write.data->type == TL_WRITE_ALLOW_WRITE &&
 
262
              lock->write_wait.data->type == TL_WRITE_ALLOW_WRITE)
 
263
          {
 
264
            found_errors++;
 
265
            fprintf(stderr,
 
266
                    "Warning at '%s': Found WRITE_ALLOW_WRITE lock waiting for WRITE_ALLOW_WRITE lock\n",
 
267
                    where);
 
268
          }
 
269
        }
 
270
        if (lock->read.data)
 
271
        {
 
272
          if (!thr_lock_owner_equal(lock->write.data->owner,
 
273
                                    lock->read.data->owner) &&
 
274
              ((lock->write.data->type > TL_WRITE_DELAYED &&
 
275
                lock->write.data->type != TL_WRITE_ONLY) ||
 
276
               ((lock->write.data->type == TL_WRITE_CONCURRENT_INSERT ||
 
277
                 lock->write.data->type == TL_WRITE_ALLOW_WRITE) &&
 
278
                lock->read_no_write_count)))
 
279
          {
 
280
            found_errors++;
 
281
            fprintf(stderr,
 
282
                    "Warning at '%s': Found lock of type %d that is write and read locked\n",
 
283
                    where, lock->write.data->type);
 
284
          }
 
285
        }
 
286
        if (lock->read_wait.data)
 
287
        {
 
288
          if (!allow_no_locks && lock->write.data->type <= TL_WRITE_DELAYED &&
 
289
              lock->read_wait.data->type <= TL_READ_HIGH_PRIORITY)
 
290
          {
 
291
            found_errors++;
 
292
            fprintf(stderr,
 
293
                    "Warning at '%s': Found read lock of type %d waiting for write lock of type %d\n",
 
294
                    where,
 
295
                    (int) lock->read_wait.data->type,
 
296
                    (int) lock->write.data->type);
 
297
          }
 
298
        }
 
299
      }
 
300
    }
 
301
  }
 
302
  return;
 
303
}
 
304
 
 
305
#else /* EXTRA_DEBUG */
 
306
#define check_locks(A,B,C)
 
307
#endif
 
308
 
104
309
 
105
310
        /* Initialize a lock */
106
311
 
107
312
void thr_lock_init(THR_LOCK *lock)
108
313
{
109
 
  lock->init();
 
314
  memset(lock, 0, sizeof(*lock));
 
315
  pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
110
316
  lock->read.last= &lock->read.data;
111
317
  lock->read_wait.last= &lock->read_wait.data;
112
318
  lock->write_wait.last= &lock->write_wait.data;
113
319
  lock->write.last= &lock->write.data;
114
 
}
115
 
 
116
 
 
117
 
void THR_LOCK_INFO::init()
118
 
{
119
 
  internal::st_my_thread_var *tmp= my_thread_var;
120
 
  thread_id= tmp->id;
121
 
  n_cursors= 0;
 
320
 
 
321
  pthread_mutex_lock(&THR_LOCK_lock);           /* Add to locks in use */
 
322
  lock->list.data=(void*) lock;
 
323
  thr_lock_thread_list=list_add(thr_lock_thread_list,&lock->list);
 
324
  pthread_mutex_unlock(&THR_LOCK_lock);
 
325
  return;
 
326
}
 
327
 
 
328
 
 
329
void thr_lock_delete(THR_LOCK *lock)
 
330
{
 
331
  pthread_mutex_destroy(&lock->mutex);
 
332
  pthread_mutex_lock(&THR_LOCK_lock);
 
333
  thr_lock_thread_list=list_delete(thr_lock_thread_list,&lock->list);
 
334
  pthread_mutex_unlock(&THR_LOCK_lock);
 
335
  return;
 
336
}
 
337
 
 
338
 
 
339
void thr_lock_info_init(THR_LOCK_INFO *info)
 
340
{
 
341
  struct st_my_thread_var *tmp= my_thread_var;
 
342
  info->thread=    tmp->pthread_self;
 
343
  info->thread_id= tmp->id;
 
344
  info->n_cursors= 0;
122
345
}
123
346
 
124
347
        /* Initialize a lock instance */
125
348
 
126
 
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
 
349
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
127
350
{
128
 
  lock= lock_arg;
129
 
  type= TL_UNLOCK;
130
 
  owner= NULL;                               /* no owner yet */
131
 
  status_param= param_arg;
132
 
  cond= NULL;
 
351
  data->lock=lock;
 
352
  data->type=TL_UNLOCK;
 
353
  data->owner= 0;                               /* no owner yet */
 
354
  data->status_param=param;
 
355
  data->cond=0;
133
356
}
134
357
 
135
358
 
139
362
  for ( ; data ; data=data->next)
140
363
  {
141
364
    if (thr_lock_owner_equal(data->owner, owner))
142
 
      return true;                                      /* Already locked by thread */
143
 
  }
144
 
  return false;
145
 
}
 
365
      return 1;                                 /* Already locked by thread */
 
366
  }
 
367
  return 0;
 
368
}
 
369
 
 
370
static inline bool have_specific_lock(THR_LOCK_DATA *data,
 
371
                                         enum thr_lock_type type)
 
372
{
 
373
  for ( ; data ; data=data->next)
 
374
  {
 
375
    if (data->type == type)
 
376
      return 1;
 
377
  }
 
378
  return 0;
 
379
}
 
380
 
146
381
 
147
382
static void wake_up_waiters(THR_LOCK *lock);
148
383
 
149
384
 
150
 
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
 
385
static enum enum_thr_lock_result
 
386
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
 
387
              bool in_wait_list)
151
388
{
152
 
  internal::st_my_thread_var *thread_var= session.getThreadVar();
153
 
 
154
 
  boost::condition_variable_any *cond= &thread_var->suspend;
 
389
  struct st_my_thread_var *thread_var= my_thread_var;
 
390
  pthread_cond_t *cond= &thread_var->suspend;
 
391
  struct timespec wait_timeout;
155
392
  enum enum_thr_lock_result result= THR_LOCK_ABORTED;
156
393
  bool can_deadlock= test(data->owner->info->n_cursors);
157
394
 
 
395
  if (!in_wait_list)
158
396
  {
159
397
    (*wait->last)=data;                         /* Wait for lock */
160
398
    data->prev= wait->last;
161
399
    wait->last= &data->next;
162
400
  }
163
401
 
164
 
  current_global_counters.locks_waited++;
 
402
  statistic_increment(locks_waited, &THR_LOCK_lock);
165
403
 
166
404
  /* Set up control struct to allow others to abort locks */
167
 
  thread_var->current_mutex= data->lock->native_handle();
168
 
  thread_var->current_cond=  &thread_var->suspend;
169
 
  data->cond= &thread_var->suspend;;
 
405
  thread_var->current_mutex= &data->lock->mutex;
 
406
  thread_var->current_cond=  cond;
 
407
  data->cond= cond;
170
408
 
171
 
  while (not thread_var->abort)
 
409
  if (can_deadlock)
 
410
    set_timespec(wait_timeout, table_lock_wait_timeout);
 
411
  while (!thread_var->abort || in_wait_list)
172
412
  {
173
 
    boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
174
 
 
175
 
    if (can_deadlock)
176
 
    {
177
 
      boost::xtime xt; 
178
 
      xtime_get(&xt, boost::TIME_UTC); 
179
 
      xt.sec += table_lock_wait_timeout; 
180
 
      if (not cond->timed_wait(scoped, xt))
181
 
      {
182
 
        result= THR_LOCK_WAIT_TIMEOUT;
183
 
        scoped.release();
184
 
        break;
185
 
      }
186
 
    }
187
 
    else
188
 
    {
189
 
      cond->wait(scoped);
190
 
    }
 
413
    int rc= (can_deadlock ?
 
414
             pthread_cond_timedwait(cond, &data->lock->mutex,
 
415
                                    &wait_timeout) :
 
416
             pthread_cond_wait(cond, &data->lock->mutex));
191
417
    /*
192
418
      We must break the wait if one of the following occurs:
193
419
      - the connection has been aborted (!thread_var->abort), but
201
427
      Order of checks below is important to not report about timeout
202
428
      if the predicate is true.
203
429
    */
204
 
    if (data->cond == NULL)
205
 
    {
206
 
      scoped.release();
207
 
      break;
208
 
    }
209
 
    scoped.release();
 
430
    if (data->cond == 0)
 
431
    {
 
432
      break;
 
433
    }
 
434
    if (rc == ETIMEDOUT || rc == ETIME)
 
435
    {
 
436
      /* purecov: begin inspected */
 
437
      result= THR_LOCK_WAIT_TIMEOUT;
 
438
      break;
 
439
      /* purecov: end */
 
440
    }
210
441
  }
211
442
  if (data->cond || data->type == TL_UNLOCK)
212
443
  {
217
448
      else
218
449
        wait->last=data->prev;
219
450
      data->type= TL_UNLOCK;                    /* No lock */
 
451
      check_locks(data->lock, "killed or timed out wait_for_lock", 1);
220
452
      wake_up_waiters(data->lock);
221
453
    }
 
454
    else
 
455
    {
 
456
      check_locks(data->lock, "aborted wait_for_lock", 0);
 
457
    }
222
458
  }
223
459
  else
224
460
  {
225
461
    result= THR_LOCK_SUCCESS;
 
462
    if (data->lock->get_status)
 
463
      (*data->lock->get_status)(data->status_param, 0);
 
464
    check_locks(data->lock,"got wait_for_lock",0);
226
465
  }
227
 
  data->lock->unlock();
 
466
  pthread_mutex_unlock(&data->lock->mutex);
228
467
 
229
468
  /* The following must be done after unlock of lock->mutex */
230
 
  boost_unique_lock_t scopedLock(thread_var->mutex);
231
 
  thread_var->current_mutex= NULL;
232
 
  thread_var->current_cond= NULL;
 
469
  pthread_mutex_lock(&thread_var->mutex);
 
470
  thread_var->current_mutex= 0;
 
471
  thread_var->current_cond=  0;
 
472
  pthread_mutex_unlock(&thread_var->mutex);
233
473
  return(result);
234
474
}
235
475
 
236
476
 
237
 
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
 
477
enum enum_thr_lock_result
 
478
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
 
479
         enum thr_lock_type lock_type)
238
480
{
239
 
  THR_LOCK *lock= data->lock;
 
481
  THR_LOCK *lock=data->lock;
240
482
  enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
241
483
  struct st_lock_list *wait_queue;
242
484
  THR_LOCK_DATA *lock_owner;
245
487
  data->cond=0;                                 /* safety */
246
488
  data->type=lock_type;
247
489
  data->owner= owner;                           /* Must be reset ! */
248
 
  lock->lock();
 
490
  pthread_mutex_lock(&lock->mutex);
 
491
  check_locks(lock,(uint) lock_type <= (uint) TL_READ_NO_INSERT ?
 
492
              "enter read_lock" : "enter write_lock",0);
249
493
  if ((int) lock_type <= (int) TL_READ_NO_INSERT)
250
494
  {
251
495
    /* Request for READ lock */
261
505
      */
262
506
 
263
507
      if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
264
 
          (lock->write.data->type <= TL_WRITE_CONCURRENT_INSERT &&
265
 
           (((int) lock_type <= (int) TL_READ_WITH_SHARED_LOCKS) ||
 
508
          (lock->write.data->type <= TL_WRITE_DELAYED &&
 
509
           (((int) lock_type <= (int) TL_READ_HIGH_PRIORITY) ||
266
510
            (lock->write.data->type != TL_WRITE_CONCURRENT_INSERT &&
267
511
             lock->write.data->type != TL_WRITE_ALLOW_READ))))
268
512
      {                                         /* Already got a write lock */
271
515
        lock->read.last= &data->next;
272
516
        if (lock_type == TL_READ_NO_INSERT)
273
517
          lock->read_no_write_count++;
274
 
        current_global_counters.locks_immediate++;
 
518
        check_locks(lock,"read lock with old write lock",0);
 
519
        if (lock->get_status)
 
520
          (*lock->get_status)(data->status_param, 0);
 
521
        statistic_increment(locks_immediate,&THR_LOCK_lock);
275
522
        goto end;
276
523
      }
277
524
      if (lock->write.data->type == TL_WRITE_ONLY)
283
530
      }
284
531
    }
285
532
    else if (!lock->write_wait.data ||
286
 
             lock->write_wait.data->type <= TL_WRITE_DEFAULT ||
 
533
             lock->write_wait.data->type <= TL_WRITE_LOW_PRIORITY ||
 
534
             lock_type == TL_READ_HIGH_PRIORITY ||
287
535
             have_old_read_lock(lock->read.data, data->owner))
288
536
    {                                           /* No important write-locks */
289
537
      (*lock->read.last)=data;                  /* Add to running FIFO */
290
538
      data->prev=lock->read.last;
291
539
      lock->read.last= &data->next;
 
540
      if (lock->get_status)
 
541
        (*lock->get_status)(data->status_param, 0);
292
542
      if (lock_type == TL_READ_NO_INSERT)
293
543
        lock->read_no_write_count++;
294
 
      current_global_counters.locks_immediate++;
 
544
      check_locks(lock,"read lock with no write locks",0);
 
545
      statistic_increment(locks_immediate,&THR_LOCK_lock);
295
546
      goto end;
296
547
    }
297
548
    /*
303
554
  }
304
555
  else                                          /* Request for WRITE lock */
305
556
  {
306
 
    if (lock_type == TL_WRITE_CONCURRENT_INSERT)
 
557
    if (lock_type == TL_WRITE_DELAYED)
 
558
    {
 
559
      if (lock->write.data && lock->write.data->type == TL_WRITE_ONLY)
 
560
      {
 
561
        data->type=TL_UNLOCK;
 
562
        result= THR_LOCK_ABORTED;               /* Can't wait for this one */
 
563
        goto end;
 
564
      }
 
565
      /*
 
566
        if there is a TL_WRITE_ALLOW_READ lock, we have to wait for a lock
 
567
        (TL_WRITE_ALLOW_READ is used for ALTER TABLE in MySQL)
 
568
      */
 
569
      if ((!lock->write.data ||
 
570
           lock->write.data->type != TL_WRITE_ALLOW_READ) &&
 
571
          !have_specific_lock(lock->write_wait.data,TL_WRITE_ALLOW_READ) &&
 
572
          (lock->write.data || lock->read.data))
 
573
      {
 
574
        /* Add delayed write lock to write_wait queue, and return at once */
 
575
        (*lock->write_wait.last)=data;
 
576
        data->prev=lock->write_wait.last;
 
577
        lock->write_wait.last= &data->next;
 
578
        data->cond=get_cond();
 
579
        /*
 
580
          We don't have to do get_status here as we will do it when we change
 
581
          the delayed lock to a real write lock
 
582
        */
 
583
        statistic_increment(locks_immediate,&THR_LOCK_lock);
 
584
        goto end;
 
585
      }
 
586
    }
 
587
    else if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
307
588
      data->type=lock_type= thr_upgraded_concurrent_insert_lock;
308
589
 
309
590
    if (lock->write.data)                       /* If there is a write lock */
338
619
        (*lock->write.last)=data;       /* Add to running fifo */
339
620
        data->prev=lock->write.last;
340
621
        lock->write.last= &data->next;
341
 
        current_global_counters.locks_immediate++;
 
622
        check_locks(lock,"second write lock",0);
 
623
        if (data->lock->get_status)
 
624
          (*data->lock->get_status)(data->status_param, 0);
 
625
        statistic_increment(locks_immediate,&THR_LOCK_lock);
342
626
        goto end;
343
627
      }
344
628
    }
350
634
        if (lock_type == TL_WRITE_CONCURRENT_INSERT)
351
635
        {
352
636
          concurrent_insert= 1;
 
637
          if ((*lock->check_status)(data->status_param))
 
638
          {
 
639
            concurrent_insert= 0;
 
640
            data->type=lock_type= thr_upgraded_concurrent_insert_lock;
 
641
          }
353
642
        }
354
643
 
355
644
        if (!lock->read.data ||
356
 
            (lock_type <= TL_WRITE_CONCURRENT_INSERT &&
 
645
            (lock_type <= TL_WRITE_DELAYED &&
357
646
             ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
358
647
               lock_type != TL_WRITE_ALLOW_WRITE) ||
359
648
              !lock->read_no_write_count)))
361
650
          (*lock->write.last)=data;             /* Add as current write lock */
362
651
          data->prev=lock->write.last;
363
652
          lock->write.last= &data->next;
364
 
          current_global_counters.locks_immediate++;
 
653
          if (data->lock->get_status)
 
654
            (*data->lock->get_status)(data->status_param, concurrent_insert);
 
655
          check_locks(lock,"only write lock",0);
 
656
          statistic_increment(locks_immediate,&THR_LOCK_lock);
365
657
          goto end;
366
658
        }
367
659
      }
380
672
    result= THR_LOCK_DEADLOCK;
381
673
    goto end;
382
674
  }
383
 
 
384
675
  /* Can't get lock yet;  Wait for it */
385
 
  return(wait_for_lock(session, wait_queue, data));
 
676
  return(wait_for_lock(wait_queue, data, 0));
386
677
end:
387
 
  lock->unlock();
388
 
 
 
678
  pthread_mutex_unlock(&lock->mutex);
389
679
  return(result);
390
680
}
391
681
 
392
682
 
393
 
static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
 
683
static inline void free_all_read_locks(THR_LOCK *lock,
 
684
                                       bool using_concurrent_insert)
394
685
{
395
 
  THR_LOCK_DATA *data= lock->read_wait.data;
 
686
  THR_LOCK_DATA *data=lock->read_wait.data;
 
687
 
 
688
  check_locks(lock,"before freeing read locks",1);
396
689
 
397
690
  /* move all locks from read_wait list to read list */
398
691
  (*lock->read.last)=data;
404
697
 
405
698
  do
406
699
  {
407
 
    boost::condition_variable_any *cond= data->cond;
 
700
    pthread_cond_t *cond=data->cond;
408
701
    if ((int) data->type == (int) TL_READ_NO_INSERT)
409
702
    {
410
703
      if (using_concurrent_insert)
411
704
      {
412
705
        /*
413
 
          We can't free this lock;
 
706
          We can't free this lock; 
414
707
          Link lock away from read chain back into read_wait chain
415
708
        */
416
709
        if (((*data->prev)=data->next))
423
716
        continue;
424
717
      }
425
718
      lock->read_no_write_count++;
426
 
    }
427
 
    data->cond= NULL;                           /* Mark thread free */
428
 
    cond->notify_one();
 
719
    }      
 
720
    data->cond=0;                               /* Mark thread free */
 
721
    pthread_cond_signal(cond);
429
722
  } while ((data=data->next));
430
723
  *lock->read_wait.last=0;
431
724
  if (!lock->read_wait.data)
432
725
    lock->write_lock_count=0;
 
726
  check_locks(lock,"after giving read locks",0);
433
727
}
434
728
 
435
 
/* Unlock lock and free next thread on same lock */
 
729
        /* Unlock lock and free next thread on same lock */
436
730
 
437
 
static void thr_unlock(THR_LOCK_DATA *data)
 
731
void thr_unlock(THR_LOCK_DATA *data)
438
732
{
439
733
  THR_LOCK *lock=data->lock;
440
734
  enum thr_lock_type lock_type=data->type;
441
 
  lock->lock();
 
735
  pthread_mutex_lock(&lock->mutex);
 
736
  check_locks(lock,"start of release lock",0);
442
737
 
443
738
  if (((*data->prev)=data->next))               /* remove from lock-list */
444
739
    data->next->prev= data->prev;
445
740
  else if (lock_type <= TL_READ_NO_INSERT)
446
741
    lock->read.last=data->prev;
 
742
  else if (lock_type == TL_WRITE_DELAYED && data->cond)
 
743
  {
 
744
    /*
 
745
      This only happens in extreme circumstances when a 
 
746
      write delayed lock that is waiting for a lock
 
747
    */
 
748
    lock->write_wait.last=data->prev;           /* Put it on wait queue */
 
749
  }
447
750
  else
448
751
    lock->write.last=data->prev;
449
752
  if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
450
 
  { }
 
753
  {
 
754
    if (lock->update_status)
 
755
      (*lock->update_status)(data->status_param);
 
756
  }
451
757
  else
452
 
  { }
 
758
  {
 
759
    if (lock->restore_status)
 
760
      (*lock->restore_status)(data->status_param);
 
761
  }
453
762
  if (lock_type == TL_READ_NO_INSERT)
454
763
    lock->read_no_write_count--;
455
764
  data->type=TL_UNLOCK;                         /* Mark unlocked */
 
765
  check_locks(lock,"after releasing lock",1);
456
766
  wake_up_waiters(lock);
457
 
  lock->unlock();
 
767
  pthread_mutex_unlock(&lock->mutex);
 
768
  return;
458
769
}
459
770
 
460
771
 
478
789
    {
479
790
      /* Release write-locks with TL_WRITE or TL_WRITE_ONLY priority first */
480
791
      if (data &&
481
 
          (!lock->read_wait.data || lock->read_wait.data->type <= TL_READ_WITH_SHARED_LOCKS))
 
792
          (data->type != TL_WRITE_LOW_PRIORITY || !lock->read_wait.data ||
 
793
           lock->read_wait.data->type < TL_READ_HIGH_PRIORITY))
482
794
      {
483
795
        if (lock->write_lock_count++ > max_write_lock_count)
484
796
        {
500
812
          data->prev=lock->write.last;
501
813
          data->next=0;
502
814
          lock->write.last= &data->next;
503
 
 
 
815
          if (data->type == TL_WRITE_CONCURRENT_INSERT &&
 
816
              (*lock->check_status)(data->status_param))
 
817
            data->type=TL_WRITE;                        /* Upgrade lock */
504
818
          {
505
 
            boost::condition_variable_any *cond= data->cond;
506
 
            data->cond= NULL;                           /* Mark thread free */
507
 
            cond->notify_one(); /* Start waiting thred */
 
819
            pthread_cond_t *cond=data->cond;
 
820
            data->cond=0;                               /* Mark thread free */
 
821
            pthread_cond_signal(cond);  /* Start waiting thread */
508
822
          }
509
823
          if (data->type != TL_WRITE_ALLOW_WRITE ||
510
824
              !lock->write_wait.data ||
512
826
            break;
513
827
          data=lock->write_wait.data;           /* Free this too */
514
828
        }
515
 
        if (data->type >= TL_WRITE)
 
829
        if (data->type >= TL_WRITE_LOW_PRIORITY)
516
830
          goto end;
517
831
        /* Release possible read locks together with the write lock */
518
832
      }
523
837
                             data->type == TL_WRITE_ALLOW_WRITE));
524
838
    }
525
839
    else if (data &&
526
 
             (lock_type=data->type) <= TL_WRITE_CONCURRENT_INSERT &&
 
840
             (lock_type=data->type) <= TL_WRITE_DELAYED &&
527
841
             ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
528
842
               lock_type != TL_WRITE_ALLOW_WRITE) ||
529
843
              !lock->read_no_write_count))
530
844
    {
 
845
      /*
 
846
        For DELAYED, ALLOW_READ, WRITE_ALLOW_WRITE or CONCURRENT_INSERT locks
 
847
        start WRITE locks together with the READ locks
 
848
      */
 
849
      if (lock_type == TL_WRITE_CONCURRENT_INSERT &&
 
850
          (*lock->check_status)(data->status_param))
 
851
      {
 
852
        data->type=TL_WRITE;                    /* Upgrade lock */
 
853
        if (lock->read_wait.data)
 
854
          free_all_read_locks(lock,0);
 
855
        goto end;
 
856
      }
531
857
      do {
532
 
        boost::condition_variable_any *cond= data->cond;
 
858
        pthread_cond_t *cond=data->cond;
533
859
        if (((*data->prev)=data->next))         /* remove from wait-list */
534
860
          data->next->prev= data->prev;
535
861
        else
538
864
        data->prev=lock->write.last;
539
865
        lock->write.last= &data->next;
540
866
        data->next=0;                           /* Only one write lock */
541
 
        data->cond= NULL;                               /* Mark thread free */
542
 
        cond->notify_one(); /* Start waiting thread */
 
867
        data->cond=0;                           /* Mark thread free */
 
868
        pthread_cond_signal(cond);      /* Start waiting thread */
543
869
      } while (lock_type == TL_WRITE_ALLOW_WRITE &&
544
870
               (data=lock->write_wait.data) &&
545
871
               data->type == TL_WRITE_ALLOW_WRITE);
549
875
                             lock_type == TL_WRITE_ALLOW_WRITE));
550
876
    }
551
877
    else if (!data && lock->read_wait.data)
552
 
    {
553
878
      free_all_read_locks(lock,0);
554
 
    }
555
879
  }
556
880
end:
 
881
  check_locks(lock, "after waking up waiters", 0);
557
882
  return;
558
883
}
559
884
 
565
890
*/
566
891
 
567
892
 
568
 
#define LOCK_CMP(A,B) ((unsigned char*) (A->lock) - (uint32_t) ((A)->type) < (unsigned char*) (B->lock)- (uint32_t) ((B)->type))
 
893
#define LOCK_CMP(A,B) ((unsigned char*) (A->lock) - (uint) ((A)->type) < (unsigned char*) (B->lock)- (uint) ((B)->type))
569
894
 
570
895
static void sort_locks(THR_LOCK_DATA **data,uint32_t count)
571
896
{
589
914
 
590
915
 
591
916
enum enum_thr_lock_result
592
 
thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
 
917
thr_multi_lock(THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
593
918
{
594
919
  THR_LOCK_DATA **pos,**end;
595
920
  if (count > 1)
597
922
  /* lock everything */
598
923
  for (pos=data,end=data+count; pos < end ; pos++)
599
924
  {
600
 
    enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
 
925
    enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
601
926
    if (result != THR_LOCK_SUCCESS)
602
927
    {                                           /* Aborted */
603
 
      thr_multi_unlock(data,(uint32_t) (pos-data));
 
928
      thr_multi_unlock(data,(uint) (pos-data));
604
929
      return(result);
605
930
    }
 
931
#ifdef MAIN
 
932
    printf("Thread: %s  Got lock: 0x%lx  type: %d\n",my_thread_name(),
 
933
           (long) pos[0]->lock, pos[0]->type); fflush(stdout);
 
934
#endif
606
935
  }
607
936
  /*
608
937
    Ensure that all get_locks() have the same status
617
946
    do
618
947
    {
619
948
      pos--;
620
 
      last_lock=(*pos);
 
949
      if (last_lock->lock == (*pos)->lock &&
 
950
          last_lock->lock->copy_status)
 
951
      {
 
952
        if (last_lock->type <= TL_READ_NO_INSERT)
 
953
        {
 
954
          THR_LOCK_DATA **read_lock;
 
955
          /*
 
956
            If we are locking the same table with read locks we must ensure
 
957
            that all tables share the status of the last write lock or
 
958
            the same read lock.
 
959
          */
 
960
          for (;
 
961
               (*pos)->type <= TL_READ_NO_INSERT &&
 
962
                 pos != data &&
 
963
                 pos[-1]->lock == (*pos)->lock ;
 
964
               pos--) ;
 
965
 
 
966
          read_lock = pos+1;
 
967
          do
 
968
          {
 
969
            (last_lock->lock->copy_status)((*read_lock)->status_param,
 
970
                                           (*pos)->status_param);
 
971
          } while (*(read_lock++) != last_lock);
 
972
          last_lock= (*pos);                    /* Point at last write lock */
 
973
        }
 
974
        else
 
975
          (*last_lock->lock->copy_status)((*pos)->status_param,
 
976
                                          last_lock->status_param);
 
977
      }
 
978
      else
 
979
        last_lock=(*pos);
621
980
    } while (pos != data);
622
981
  }
623
982
#endif
632
991
 
633
992
  for (pos=data,end=data+count; pos < end ; pos++)
634
993
  {
 
994
#ifdef MAIN
 
995
    printf("Thread: %s  Rel lock: 0x%lx  type: %d\n",
 
996
           my_thread_name(), (long) pos[0]->lock, pos[0]->type);
 
997
    fflush(stdout);
 
998
#endif
635
999
    if ((*pos)->type != TL_UNLOCK)
636
1000
      thr_unlock(*pos);
637
1001
  }
638
1002
  return;
639
1003
}
640
1004
 
641
 
void DrizzleLock::unlock(uint32_t count)
642
 
{
643
 
  THR_LOCK_DATA **pos,**end;
644
 
 
645
 
  for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
646
 
  {
647
 
    if ((*pos)->type != TL_UNLOCK)
648
 
      thr_unlock(*pos);
649
 
  }
650
 
}
651
 
 
652
1005
/*
653
1006
  Abort all threads waiting for a lock. The lock will be upgraded to
654
1007
  TL_WRITE_ONLY to abort any new accesses to the lock
655
1008
*/
656
1009
 
657
 
void THR_LOCK::abort_locks()
 
1010
void thr_abort_locks(THR_LOCK *lock, bool upgrade_lock)
658
1011
{
659
 
  boost_unique_lock_t scopedLock(mutex);
 
1012
  THR_LOCK_DATA *data;
 
1013
  pthread_mutex_lock(&lock->mutex);
660
1014
 
661
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
1015
  for (data=lock->read_wait.data; data ; data=data->next)
662
1016
  {
663
 
    local_data->type= TL_UNLOCK;                        /* Mark killed */
 
1017
    data->type=TL_UNLOCK;                       /* Mark killed */
664
1018
    /* It's safe to signal the cond first: we're still holding the mutex. */
665
 
    local_data->cond->notify_one();
666
 
    local_data->cond= NULL;                             /* Removed from list */
 
1019
    pthread_cond_signal(data->cond);
 
1020
    data->cond=0;                               /* Removed from list */
667
1021
  }
668
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
1022
  for (data=lock->write_wait.data; data ; data=data->next)
669
1023
  {
670
 
    local_data->type= TL_UNLOCK;
671
 
    local_data->cond->notify_one();
672
 
    local_data->cond= NULL;
 
1024
    data->type=TL_UNLOCK;
 
1025
    pthread_cond_signal(data->cond);
 
1026
    data->cond=0;
673
1027
  }
674
 
  read_wait.last= &read_wait.data;
675
 
  write_wait.last= &write_wait.data;
676
 
  read_wait.data= write_wait.data=0;
677
 
  if (write.data)
678
 
    write.data->type=TL_WRITE_ONLY;
 
1028
  lock->read_wait.last= &lock->read_wait.data;
 
1029
  lock->write_wait.last= &lock->write_wait.data;
 
1030
  lock->read_wait.data=lock->write_wait.data=0;
 
1031
  if (upgrade_lock && lock->write.data)
 
1032
    lock->write.data->type=TL_WRITE_ONLY;
 
1033
  pthread_mutex_unlock(&lock->mutex);
 
1034
  return;
679
1035
}
680
1036
 
681
1037
 
685
1041
  This is used to abort all locks for a specific thread
686
1042
*/
687
1043
 
688
 
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
 
1044
bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread_id)
689
1045
{
 
1046
  THR_LOCK_DATA *data;
690
1047
  bool found= false;
691
1048
 
692
 
  boost_unique_lock_t scopedLock(mutex);
693
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
1049
  pthread_mutex_lock(&lock->mutex);
 
1050
  for (data= lock->read_wait.data; data ; data= data->next)
694
1051
  {
695
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
1052
    if (data->owner->info->thread_id == thread_id)    /* purecov: tested */
696
1053
    {
697
 
      local_data->type= TL_UNLOCK;                      /* Mark killed */
 
1054
      data->type= TL_UNLOCK;                    /* Mark killed */
698
1055
      /* It's safe to signal the cond first: we're still holding the mutex. */
699
1056
      found= true;
700
 
      local_data->cond->notify_one();
701
 
      local_data->cond= 0;                              /* Removed from list */
 
1057
      pthread_cond_signal(data->cond);
 
1058
      data->cond= 0;                            /* Removed from list */
702
1059
 
703
 
      if (((*local_data->prev)= local_data->next))
704
 
        local_data->next->prev= local_data->prev;
 
1060
      if (((*data->prev)= data->next))
 
1061
        data->next->prev= data->prev;
705
1062
      else
706
 
        read_wait.last= local_data->prev;
 
1063
        lock->read_wait.last= data->prev;
707
1064
    }
708
1065
  }
709
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
1066
  for (data= lock->write_wait.data; data ; data= data->next)
710
1067
  {
711
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
1068
    if (data->owner->info->thread_id == thread_id) /* purecov: tested */
712
1069
    {
713
 
      local_data->type= TL_UNLOCK;
 
1070
      data->type= TL_UNLOCK;
714
1071
      found= true;
715
 
      local_data->cond->notify_one();
716
 
      local_data->cond= NULL;
717
 
 
718
 
      if (((*local_data->prev)= local_data->next))
719
 
        local_data->next->prev= local_data->prev;
720
 
      else
721
 
        write_wait.last= local_data->prev;
722
 
    }
723
 
  }
724
 
  wake_up_waiters(this);
725
 
 
726
 
  return found;
727
 
}
728
 
 
729
 
} /* namespace drizzled */
 
1072
      pthread_cond_signal(data->cond);
 
1073
      data->cond= 0;
 
1074
 
 
1075
      if (((*data->prev)= data->next))
 
1076
        data->next->prev= data->prev;
 
1077
      else
 
1078
        lock->write_wait.last= data->prev;
 
1079
    }
 
1080
  }
 
1081
  wake_up_waiters(lock);
 
1082
  pthread_mutex_unlock(&lock->mutex);
 
1083
  return(found);
 
1084
}
 
1085
 
 
1086
 
 
1087
/*
 
1088
  Downgrade a WRITE_* to a lower WRITE level
 
1089
  SYNOPSIS
 
1090
    thr_downgrade_write_lock()
 
1091
    in_data                   Lock data of thread downgrading its lock
 
1092
    new_lock_type             New write lock type
 
1093
  RETURN VALUE
 
1094
    NONE
 
1095
  DESCRIPTION
 
1096
    This can be used to downgrade a lock already owned. When the downgrade
 
1097
    occurs also other waiters, both readers and writers can be allowed to
 
1098
    start.
 
1099
    The previous lock is often TL_WRITE_ONLY but can also be
 
1100
    TL_WRITE and TL_WRITE_ALLOW_READ. The normal downgrade variants are
 
1101
    TL_WRITE_ONLY => TL_WRITE_ALLOW_READ After a short exclusive lock
 
1102
    TL_WRITE_ALLOW_READ => TL_WRITE_ALLOW_WRITE After discovering that the
 
1103
    operation didn't need such a high lock.
 
1104
    TL_WRITE_ONLY => TL_WRITE after a short exclusive lock while holding a
 
1105
    write table lock
 
1106
    TL_WRITE_ONLY => TL_WRITE_ALLOW_WRITE After a short exclusive lock after
 
1107
    already earlier having dongraded lock to TL_WRITE_ALLOW_WRITE
 
1108
    The implementation is conservative and rather don't start rather than
 
1109
    go on unknown paths to start, the common cases are handled.
 
1110
 
 
1111
    NOTE:
 
1112
    In its current implementation it is only allowed to downgrade from
 
1113
    TL_WRITE_ONLY. In this case there are no waiters. Thus no wake up
 
1114
    logic is required.
 
1115
*/
 
1116
 
 
1117
void thr_downgrade_write_lock(THR_LOCK_DATA *in_data,
 
1118
                              enum thr_lock_type new_lock_type)
 
1119
{
 
1120
  THR_LOCK *lock=in_data->lock;
 
1121
 
 
1122
  pthread_mutex_lock(&lock->mutex);
 
1123
  in_data->type= new_lock_type;
 
1124
  check_locks(lock,"after downgrading lock",0);
 
1125
 
 
1126
  pthread_mutex_unlock(&lock->mutex);
 
1127
  return;
 
1128
}
 
1129
 
 
1130
/* Upgrade a WRITE_DELAY lock to a WRITE_LOCK */
 
1131
 
 
1132
bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data)
 
1133
{
 
1134
  THR_LOCK *lock=data->lock;
 
1135
 
 
1136
  pthread_mutex_lock(&lock->mutex);
 
1137
  if (data->type == TL_UNLOCK || data->type >= TL_WRITE_LOW_PRIORITY)
 
1138
  {
 
1139
    pthread_mutex_unlock(&lock->mutex);
 
1140
    return(data->type == TL_UNLOCK);    /* Test if Aborted */
 
1141
  }
 
1142
  check_locks(lock,"before upgrading lock",0);
 
1143
  /* TODO:  Upgrade to TL_WRITE_CONCURRENT_INSERT in some cases */
 
1144
  data->type=TL_WRITE;                          /* Upgrade lock */
 
1145
 
 
1146
  /* Check if someone has given us the lock */
 
1147
  if (!data->cond)
 
1148
  {
 
1149
    if (!lock->read.data)                       /* No read locks */
 
1150
    {                                           /* We have the lock */
 
1151
      if (data->lock->get_status)
 
1152
        (*data->lock->get_status)(data->status_param, 0);
 
1153
      pthread_mutex_unlock(&lock->mutex);
 
1154
      return(0);
 
1155
    }
 
1156
 
 
1157
    if (((*data->prev)=data->next))             /* remove from lock-list */
 
1158
      data->next->prev= data->prev;
 
1159
    else
 
1160
      lock->write.last=data->prev;
 
1161
 
 
1162
    if ((data->next=lock->write_wait.data))     /* Put first in lock_list */
 
1163
      data->next->prev= &data->next;
 
1164
    else
 
1165
      lock->write_wait.last= &data->next;
 
1166
    data->prev= &lock->write_wait.data;
 
1167
    lock->write_wait.data=data;
 
1168
    check_locks(lock,"upgrading lock",0);
 
1169
  }
 
1170
  else
 
1171
  {
 
1172
    check_locks(lock,"waiting for lock",0);
 
1173
  }
 
1174
  return(wait_for_lock(&lock->write_wait,data,1));
 
1175
}
 
1176
 
 
1177
 
 
1178
/* downgrade a WRITE lock to a WRITE_DELAY lock if there is pending locks */
 
1179
 
 
1180
bool thr_reschedule_write_lock(THR_LOCK_DATA *data)
 
1181
{
 
1182
  THR_LOCK *lock=data->lock;
 
1183
 
 
1184
  pthread_mutex_lock(&lock->mutex);
 
1185
  if (!lock->read_wait.data)                    /* No waiting read locks */
 
1186
  {
 
1187
    pthread_mutex_unlock(&lock->mutex);
 
1188
    return(0);
 
1189
  }
 
1190
 
 
1191
  data->type=TL_WRITE_DELAYED;
 
1192
  if (lock->update_status)
 
1193
    (*lock->update_status)(data->status_param);
 
1194
  if (((*data->prev)=data->next))               /* remove from lock-list */
 
1195
    data->next->prev= data->prev;
 
1196
  else
 
1197
    lock->write.last=data->prev;
 
1198
 
 
1199
  if ((data->next=lock->write_wait.data))       /* Put first in lock_list */
 
1200
    data->next->prev= &data->next;
 
1201
  else
 
1202
    lock->write_wait.last= &data->next;
 
1203
  data->prev= &lock->write_wait.data;
 
1204
  data->cond=get_cond();                        /* This was zero */
 
1205
  lock->write_wait.data=data;
 
1206
  free_all_read_locks(lock,0);
 
1207
 
 
1208
  pthread_mutex_unlock(&lock->mutex);
 
1209
  return(thr_upgrade_write_delay_lock(data));
 
1210
}
 
1211
 
 
1212
 
 
1213
#include <my_sys.h>
 
1214
 
 
1215
/*****************************************************************************
 
1216
** Test of thread locks
 
1217
****************************************************************************/
 
1218
 
 
1219
#ifdef MAIN
 
1220
 
 
1221
struct st_test {
 
1222
  uint32_t lock_nr;
 
1223
  enum thr_lock_type lock_type;
 
1224
};
 
1225
 
 
1226
THR_LOCK locks[5];                      /* 4 locks */
 
1227
 
 
1228
struct st_test test_0[] = {{0,TL_READ}};        /* One lock */
 
1229
struct st_test test_1[] = {{0,TL_READ},{0,TL_WRITE}}; /* Read and write lock of lock 0 */
 
1230
struct st_test test_2[] = {{1,TL_WRITE},{0,TL_READ},{2,TL_READ}};
 
1231
struct st_test test_3[] = {{2,TL_WRITE},{1,TL_READ},{0,TL_READ}}; /* Deadlock with test_2 ? */
 
1232
struct st_test test_4[] = {{0,TL_WRITE},{0,TL_READ},{0,TL_WRITE},{0,TL_READ}};
 
1233
struct st_test test_5[] = {{0,TL_READ},{1,TL_READ},{2,TL_READ},{3,TL_READ}}; /* Many reads */
 
1234
struct st_test test_6[] = {{0,TL_WRITE},{1,TL_WRITE},{2,TL_WRITE},{3,TL_WRITE}}; /* Many writes */
 
1235
struct st_test test_7[] = {{3,TL_READ}};
 
1236
struct st_test test_8[] = {{1,TL_READ_NO_INSERT},{2,TL_READ_NO_INSERT},{3,TL_READ_NO_INSERT}};  /* Should be quick */
 
1237
struct st_test test_9[] = {{4,TL_READ_HIGH_PRIORITY}};
 
1238
struct st_test test_10[] ={{4,TL_WRITE}};
 
1239
struct st_test test_11[] = {{0,TL_WRITE_LOW_PRIORITY},{1,TL_WRITE_LOW_PRIORITY},{2,TL_WRITE_LOW_PRIORITY},{3,TL_WRITE_LOW_PRIORITY}}; /* Many writes */
 
1240
struct st_test test_12[] = {{0,TL_WRITE_ALLOW_READ},{1,TL_WRITE_ALLOW_READ},{2,TL_WRITE_ALLOW_READ},{3,TL_WRITE_ALLOW_READ}}; /* Many writes */
 
1241
struct st_test test_13[] = {{0,TL_WRITE_CONCURRENT_INSERT},{1,TL_WRITE_CONCURRENT_INSERT},{2,TL_WRITE_CONCURRENT_INSERT},{3,TL_WRITE_CONCURRENT_INSERT}};
 
1242
struct st_test test_14[] = {{0,TL_WRITE_CONCURRENT_INSERT},{1,TL_READ}};
 
1243
struct st_test test_15[] = {{0,TL_WRITE_ALLOW_WRITE},{1,TL_READ}};
 
1244
struct st_test test_16[] = {{0,TL_WRITE_ALLOW_WRITE},{1,TL_WRITE_ALLOW_WRITE}};
 
1245
 
 
1246
struct st_test *tests[] = {test_0,test_1,test_2,test_3,test_4,test_5,test_6,
 
1247
                           test_7,test_8,test_9,test_10,test_11,test_12,
 
1248
                           test_13,test_14,test_15,test_16};
 
1249
int lock_counts[]= {sizeof(test_0)/sizeof(struct st_test),
 
1250
                    sizeof(test_1)/sizeof(struct st_test),
 
1251
                    sizeof(test_2)/sizeof(struct st_test),
 
1252
                    sizeof(test_3)/sizeof(struct st_test),
 
1253
                    sizeof(test_4)/sizeof(struct st_test),
 
1254
                    sizeof(test_5)/sizeof(struct st_test),
 
1255
                    sizeof(test_6)/sizeof(struct st_test),
 
1256
                    sizeof(test_7)/sizeof(struct st_test),
 
1257
                    sizeof(test_8)/sizeof(struct st_test),
 
1258
                    sizeof(test_9)/sizeof(struct st_test),
 
1259
                    sizeof(test_10)/sizeof(struct st_test),
 
1260
                    sizeof(test_11)/sizeof(struct st_test),
 
1261
                    sizeof(test_12)/sizeof(struct st_test),
 
1262
                    sizeof(test_13)/sizeof(struct st_test),
 
1263
                    sizeof(test_14)/sizeof(struct st_test),
 
1264
                    sizeof(test_15)/sizeof(struct st_test),
 
1265
                    sizeof(test_16)/sizeof(struct st_test)
 
1266
};
 
1267
 
 
1268
 
 
1269
static pthread_cond_t COND_thread_count;
 
1270
static pthread_mutex_t LOCK_thread_count;
 
1271
static uint32_t thread_count;
 
1272
static uint32_t sum=0;
 
1273
 
 
1274
#define MAX_LOCK_COUNT 8
 
1275
 
 
1276
/* The following functions is for WRITE_CONCURRENT_INSERT */
 
1277
 
 
1278
static void test_get_status(void* param __attribute__((unused)),
 
1279
                            int concurrent_insert __attribute__((unused)))
 
1280
{
 
1281
}
 
1282
 
 
1283
static void test_update_status(void* param __attribute__((unused)))
 
1284
{
 
1285
}
 
1286
 
 
1287
static void test_copy_status(void* to __attribute__((unused)) ,
 
1288
                             void *from __attribute__((unused)))
 
1289
{
 
1290
}
 
1291
 
 
1292
static bool test_check_status(void* param __attribute__((unused)))
 
1293
{
 
1294
  return 0;
 
1295
}
 
1296
 
 
1297
 
 
1298
static void *test_thread(void *arg)
 
1299
{
 
1300
  int i,j,param=*((int*) arg);
 
1301
  THR_LOCK_DATA data[MAX_LOCK_COUNT];
 
1302
  THR_LOCK_OWNER owner;
 
1303
  THR_LOCK_INFO lock_info;
 
1304
  THR_LOCK_DATA *multi_locks[MAX_LOCK_COUNT];
 
1305
  my_thread_init();
 
1306
 
 
1307
  printf("Thread %s (%d) started\n",my_thread_name(),param); fflush(stdout);
 
1308
 
 
1309
 
 
1310
  thr_lock_info_init(&lock_info);
 
1311
  thr_lock_owner_init(&owner, &lock_info);
 
1312
  for (i=0; i < lock_counts[param] ; i++)
 
1313
    thr_lock_data_init(locks+tests[param][i].lock_nr,data+i,NULL);
 
1314
  for (j=1 ; j < 10 ; j++)              /* try locking 10 times */
 
1315
  {
 
1316
    for (i=0; i < lock_counts[param] ; i++)
 
1317
    {                                   /* Init multi locks */
 
1318
      multi_locks[i]= &data[i];
 
1319
      data[i].type= tests[param][i].lock_type;
 
1320
    }
 
1321
    thr_multi_lock(multi_locks, lock_counts[param], &owner);
 
1322
    pthread_mutex_lock(&LOCK_thread_count);
 
1323
    {
 
1324
      int tmp=rand() & 7;                       /* Do something from 0-2 sec */
 
1325
      if (tmp == 0)
 
1326
        sleep(1);
 
1327
      else if (tmp == 1)
 
1328
        sleep(2);
 
1329
      else
 
1330
      {
 
1331
        uint32_t k;
 
1332
        for (k=0 ; k < (uint32_t) (tmp-2)*100000L ; k++)
 
1333
          sum+=k;
 
1334
      }
 
1335
    }
 
1336
    pthread_mutex_unlock(&LOCK_thread_count);
 
1337
    thr_multi_unlock(multi_locks,lock_counts[param]);
 
1338
  }
 
1339
 
 
1340
  printf("Thread %s (%d) ended\n",my_thread_name(),param); fflush(stdout);
 
1341
  thr_print_locks();
 
1342
  pthread_mutex_lock(&LOCK_thread_count);
 
1343
  thread_count--;
 
1344
  pthread_cond_signal(&COND_thread_count); /* Tell main we are ready */
 
1345
  pthread_mutex_unlock(&LOCK_thread_count);
 
1346
  free((unsigned char*) arg);
 
1347
  return 0;
 
1348
}
 
1349
 
 
1350
 
 
1351
int main(int argc __attribute__((unused)),char **argv __attribute__((unused)))
 
1352
{
 
1353
  pthread_t tid;
 
1354
  pthread_attr_t thr_attr;
 
1355
  int i,*param,error;
 
1356
  MY_INIT(argv[0]);
 
1357
 
 
1358
  printf("Main thread: %s\n",my_thread_name());
 
1359
 
 
1360
  if ((error=pthread_cond_init(&COND_thread_count,NULL)))
 
1361
  {
 
1362
    fprintf(stderr,"Got error: %d from pthread_cond_init (errno: %d)",
 
1363
            error,errno);
 
1364
    exit(1);
 
1365
  }
 
1366
  if ((error=pthread_mutex_init(&LOCK_thread_count,MY_MUTEX_INIT_FAST)))
 
1367
  {
 
1368
    fprintf(stderr,"Got error: %d from pthread_cond_init (errno: %d)",
 
1369
            error,errno);
 
1370
    exit(1);
 
1371
  }
 
1372
 
 
1373
  for (i=0 ; i < (int) array_elements(locks) ; i++)
 
1374
  {
 
1375
    thr_lock_init(locks+i);
 
1376
    locks[i].check_status= test_check_status;
 
1377
    locks[i].update_status=test_update_status;
 
1378
    locks[i].copy_status=  test_copy_status;
 
1379
    locks[i].get_status=   test_get_status;
 
1380
  }
 
1381
  if ((error=pthread_attr_init(&thr_attr)))
 
1382
  {
 
1383
    fprintf(stderr,"Got error: %d from pthread_attr_init (errno: %d)",
 
1384
            error,errno);
 
1385
    exit(1);
 
1386
  }
 
1387
  if ((error=pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED)))
 
1388
  {
 
1389
    fprintf(stderr,
 
1390
            "Got error: %d from pthread_attr_setdetachstate (errno: %d)",
 
1391
            error,errno);
 
1392
    exit(1);
 
1393
  }
 
1394
#ifndef pthread_attr_setstacksize               /* void return value */
 
1395
  if ((error=pthread_attr_setstacksize(&thr_attr,65536L)))
 
1396
  {
 
1397
    fprintf(stderr,"Got error: %d from pthread_attr_setstacksize (errno: %d)",
 
1398
            error,errno);
 
1399
    exit(1);
 
1400
  }
 
1401
#endif
 
1402
#ifdef HAVE_THR_SETCONCURRENCY
 
1403
  thr_setconcurrency(2);
 
1404
#endif
 
1405
  for (i=0 ; i < (int) array_elements(lock_counts) ; i++)
 
1406
  {
 
1407
    param=(int*) malloc(sizeof(int));
 
1408
    *param=i;
 
1409
 
 
1410
    if ((error=pthread_mutex_lock(&LOCK_thread_count)))
 
1411
    {
 
1412
      fprintf(stderr,"Got error: %d from pthread_mutex_lock (errno: %d)",
 
1413
              error,errno);
 
1414
      exit(1);
 
1415
    }
 
1416
    if ((error=pthread_create(&tid,&thr_attr,test_thread,(void*) param)))
 
1417
    {
 
1418
      fprintf(stderr,"Got error: %d from pthread_create (errno: %d)\n",
 
1419
              error,errno);
 
1420
      pthread_mutex_unlock(&LOCK_thread_count);
 
1421
      exit(1);
 
1422
    }
 
1423
    thread_count++;
 
1424
    pthread_mutex_unlock(&LOCK_thread_count);
 
1425
  }
 
1426
 
 
1427
  pthread_attr_destroy(&thr_attr);
 
1428
  if ((error=pthread_mutex_lock(&LOCK_thread_count)))
 
1429
    fprintf(stderr,"Got error: %d from pthread_mutex_lock\n",error);
 
1430
  while (thread_count)
 
1431
  {
 
1432
    if ((error=pthread_cond_wait(&COND_thread_count,&LOCK_thread_count)))
 
1433
      fprintf(stderr,"Got error: %d from pthread_cond_wait\n",error);
 
1434
  }
 
1435
  if ((error=pthread_mutex_unlock(&LOCK_thread_count)))
 
1436
    fprintf(stderr,"Got error: %d from pthread_mutex_unlock\n",error);
 
1437
  for (i=0 ; i < (int) array_elements(locks) ; i++)
 
1438
    thr_lock_delete(locks+i);
 
1439
#ifdef EXTRA_DEBUG
 
1440
  if (found_errors)
 
1441
    printf("Got %d warnings\n",found_errors);
 
1442
  else
 
1443
#endif
 
1444
    printf("Test succeeded\n");
 
1445
  return 0;
 
1446
}
 
1447
 
 
1448
#endif /* MAIN */