~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/thr_lock.cc

  • Committer: Brian Aker
  • Date: 2009-03-25 18:24:15 UTC
  • mto: This revision was merged to the branch mainline in revision 963.
  • Revision ID: brian@tangent.org-20090325182415-opf2720c1hidtfgk
Cut down on shutdown loop of plugins (cutting stuff out in order to simplify
old lock system).

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
Read and write locks for Posix threads. All tread must acquire
26
26
 
27
27
TL_READ                 # Low priority read
28
28
TL_READ_WITH_SHARED_LOCKS
 
29
TL_READ_HIGH_PRIORITY   # High priority read
29
30
TL_READ_NO_INSERT       # Read without concurrent inserts
30
31
TL_WRITE_ALLOW_WRITE    # Write lock that allows other writers
31
32
TL_WRITE_ALLOW_READ     # Write lock, but allow reading
32
33
TL_WRITE_CONCURRENT_INSERT
33
34
                        # Insert that can be mixed when selects
 
35
TL_WRITE_DELAYED        # Used by delayed insert
 
36
                        # Allows lower locks to take over
 
37
TL_WRITE_LOW_PRIORITY   # Low priority write
34
38
TL_WRITE                # High priority write
35
39
TL_WRITE_ONLY           # High priority write
36
40
                        # Abort all new lock request with an error
46
50
should put a pointer to the following functions in the lock structure:
47
51
(If the pointer is zero (default), the function is not called)
48
52
 
 
53
check_status:
 
54
         Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
 
55
         we check if this function exists and returns 0.
 
56
         If not, then the lock is upgraded to TL_WRITE_LOCK
 
57
         In MyISAM this is a simple check if the insert can be done
 
58
         at the end of the datafile.
 
59
update_status:
 
60
        Before a write lock is released, this function is called.
 
61
        In MyISAM this functions updates the count and length of the datafile
 
62
get_status:
 
63
        When one gets a lock this functions is called.
 
64
        In MyISAM this stores the number of rows and size of the datafile
 
65
        for concurrent reads.
49
66
 
50
67
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
51
 
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
 
68
TL_WRITE_CONCURRENT_INSERT or one TL_WRITE_DELAYED lock at the same time as
 
69
multiple read locks.
52
70
 
53
71
*/
54
72
 
55
 
#include "config.h"
56
 
#include "drizzled/internal/my_sys.h"
57
 
#include "drizzled/internal/thread_var.h"
58
 
#include "drizzled/statistics_variables.h"
59
 
#include "drizzled/pthread_globals.h"
60
 
 
61
 
#include "drizzled/session.h"
 
73
#include "mysys_priv.h"
62
74
 
63
75
#include "thr_lock.h"
64
 
#include "drizzled/internal/m_string.h"
 
76
#include <mystrings/m_string.h>
65
77
#include <errno.h>
66
78
#include <list>
67
79
 
78
90
 
79
91
#include <drizzled/util/test.h>
80
92
 
81
 
#include <boost/interprocess/sync/lock_options.hpp>
82
 
 
83
93
using namespace std;
84
94
 
85
 
namespace drizzled
86
 
{
87
 
 
 
95
bool thr_lock_inited=0;
 
96
uint32_t locks_immediate = 0L, locks_waited = 0L;
88
97
uint64_t table_lock_wait_timeout;
89
 
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
90
 
 
91
 
 
92
 
uint64_t max_write_lock_count= UINT64_MAX;
93
 
 
94
 
void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count);
 
98
enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
 
99
 
 
100
 
 
101
static list<THR_LOCK *> thr_lock_thread_list;          /* List of threads in use */
 
102
 
 
103
uint64_t max_write_lock_count= ~(uint64_t) 0L;
 
104
 
 
105
static inline pthread_cond_t *get_cond(void)
 
106
{
 
107
  return &my_thread_var->suspend;
 
108
}
95
109
 
96
110
/*
97
111
** For the future (now the thread specific cond is alloced by my_pthread.c)
98
112
*/
99
113
 
 
114
bool init_thr_lock()
 
115
{
 
116
  thr_lock_inited=1;
 
117
  return 0;
 
118
}
 
119
 
100
120
static inline bool
101
121
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
102
122
{
108
128
 
109
129
void thr_lock_init(THR_LOCK *lock)
110
130
{
111
 
  lock->init();
 
131
  memset(lock, 0, sizeof(*lock));
 
132
  pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
112
133
  lock->read.last= &lock->read.data;
113
134
  lock->read_wait.last= &lock->read_wait.data;
114
135
  lock->write_wait.last= &lock->write_wait.data;
115
136
  lock->write.last= &lock->write.data;
116
 
}
117
 
 
118
 
 
119
 
void THR_LOCK_INFO::init()
120
 
{
121
 
  internal::st_my_thread_var *tmp= my_thread_var;
122
 
  thread_id= tmp->id;
123
 
  n_cursors= 0;
 
137
 
 
138
  pthread_mutex_lock(&THR_LOCK_lock);           /* Add to locks in use */
 
139
  thr_lock_thread_list.push_front(lock);
 
140
  pthread_mutex_unlock(&THR_LOCK_lock);
 
141
  return;
 
142
}
 
143
 
 
144
 
 
145
void thr_lock_delete(THR_LOCK *lock)
 
146
{
 
147
  pthread_mutex_destroy(&lock->mutex);
 
148
  pthread_mutex_lock(&THR_LOCK_lock);
 
149
  thr_lock_thread_list.remove(lock);
 
150
  pthread_mutex_unlock(&THR_LOCK_lock);
 
151
  return;
 
152
}
 
153
 
 
154
 
 
155
void thr_lock_info_init(THR_LOCK_INFO *info)
 
156
{
 
157
  struct st_my_thread_var *tmp= my_thread_var;
 
158
  info->thread=    tmp->pthread_self;
 
159
  info->thread_id= tmp->id;
 
160
  info->n_cursors= 0;
124
161
}
125
162
 
126
163
        /* Initialize a lock instance */
127
164
 
128
 
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
 
165
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
129
166
{
130
 
  lock= lock_arg;
131
 
  type= TL_UNLOCK;
132
 
  owner= NULL;                               /* no owner yet */
133
 
  status_param= param_arg;
134
 
  cond= NULL;
 
167
  data->lock=lock;
 
168
  data->type=TL_UNLOCK;
 
169
  data->owner= 0;                               /* no owner yet */
 
170
  data->status_param=param;
 
171
  data->cond=0;
135
172
}
136
173
 
137
174
 
141
178
  for ( ; data ; data=data->next)
142
179
  {
143
180
    if (thr_lock_owner_equal(data->owner, owner))
144
 
      return true;                                      /* Already locked by thread */
145
 
  }
146
 
  return false;
147
 
}
 
181
      return 1;                                 /* Already locked by thread */
 
182
  }
 
183
  return 0;
 
184
}
 
185
 
 
186
static inline bool have_specific_lock(THR_LOCK_DATA *data,
 
187
                                         enum thr_lock_type type)
 
188
{
 
189
  for ( ; data ; data=data->next)
 
190
  {
 
191
    if (data->type == type)
 
192
      return 1;
 
193
  }
 
194
  return 0;
 
195
}
 
196
 
148
197
 
149
198
static void wake_up_waiters(THR_LOCK *lock);
150
199
 
151
200
 
152
 
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
 
201
static enum enum_thr_lock_result
 
202
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
 
203
              bool in_wait_list)
153
204
{
154
 
  internal::st_my_thread_var *thread_var= session.getThreadVar();
155
 
 
156
 
  boost::condition_variable_any *cond= &thread_var->suspend;
 
205
  struct st_my_thread_var *thread_var= my_thread_var;
 
206
  pthread_cond_t *cond= &thread_var->suspend;
 
207
  struct timespec wait_timeout;
157
208
  enum enum_thr_lock_result result= THR_LOCK_ABORTED;
158
209
  bool can_deadlock= test(data->owner->info->n_cursors);
159
210
 
 
211
  if (!in_wait_list)
160
212
  {
161
213
    (*wait->last)=data;                         /* Wait for lock */
162
214
    data->prev= wait->last;
163
215
    wait->last= &data->next;
164
216
  }
165
217
 
166
 
  current_global_counters.locks_waited++;
 
218
  statistic_increment(locks_waited, &THR_LOCK_lock);
167
219
 
168
220
  /* Set up control struct to allow others to abort locks */
169
 
  thread_var->current_mutex= data->lock->native_handle();
170
 
  thread_var->current_cond=  &thread_var->suspend;
171
 
  data->cond= &thread_var->suspend;;
 
221
  thread_var->current_mutex= &data->lock->mutex;
 
222
  thread_var->current_cond=  cond;
 
223
  data->cond= cond;
172
224
 
173
 
  while (not thread_var->abort)
 
225
  if (can_deadlock)
 
226
    set_timespec(wait_timeout, table_lock_wait_timeout);
 
227
  while (!thread_var->abort || in_wait_list)
174
228
  {
175
 
    boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
176
 
 
177
 
    if (can_deadlock)
178
 
    {
179
 
      boost::xtime xt; 
180
 
      xtime_get(&xt, boost::TIME_UTC); 
181
 
      xt.sec += table_lock_wait_timeout; 
182
 
      if (not cond->timed_wait(scoped, xt))
183
 
      {
184
 
        result= THR_LOCK_WAIT_TIMEOUT;
185
 
        scoped.release();
186
 
        break;
187
 
      }
188
 
    }
189
 
    else
190
 
    {
191
 
      cond->wait(scoped);
192
 
    }
 
229
    int rc= (can_deadlock ?
 
230
             pthread_cond_timedwait(cond, &data->lock->mutex,
 
231
                                    &wait_timeout) :
 
232
             pthread_cond_wait(cond, &data->lock->mutex));
193
233
    /*
194
234
      We must break the wait if one of the following occurs:
195
235
      - the connection has been aborted (!thread_var->abort), but
203
243
      Order of checks below is important to not report about timeout
204
244
      if the predicate is true.
205
245
    */
206
 
    if (data->cond == NULL)
207
 
    {
208
 
      scoped.release();
209
 
      break;
210
 
    }
211
 
    scoped.release();
 
246
    if (data->cond == 0)
 
247
    {
 
248
      break;
 
249
    }
 
250
    if (rc == ETIMEDOUT || rc == ETIME)
 
251
    {
 
252
      /* purecov: begin inspected */
 
253
      result= THR_LOCK_WAIT_TIMEOUT;
 
254
      break;
 
255
      /* purecov: end */
 
256
    }
212
257
  }
213
258
  if (data->cond || data->type == TL_UNLOCK)
214
259
  {
225
270
  else
226
271
  {
227
272
    result= THR_LOCK_SUCCESS;
 
273
    if (data->lock->get_status)
 
274
      (*data->lock->get_status)(data->status_param, 0);
228
275
  }
229
 
  data->lock->unlock();
 
276
  pthread_mutex_unlock(&data->lock->mutex);
230
277
 
231
278
  /* The following must be done after unlock of lock->mutex */
232
 
  boost_unique_lock_t scopedLock(thread_var->mutex);
233
 
  thread_var->current_mutex= NULL;
234
 
  thread_var->current_cond= NULL;
 
279
  pthread_mutex_lock(&thread_var->mutex);
 
280
  thread_var->current_mutex= 0;
 
281
  thread_var->current_cond=  0;
 
282
  pthread_mutex_unlock(&thread_var->mutex);
235
283
  return(result);
236
284
}
237
285
 
238
286
 
239
 
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
 
287
enum enum_thr_lock_result
 
288
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
 
289
         enum thr_lock_type lock_type)
240
290
{
241
 
  THR_LOCK *lock= data->lock;
 
291
  THR_LOCK *lock=data->lock;
242
292
  enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
243
293
  struct st_lock_list *wait_queue;
244
294
  THR_LOCK_DATA *lock_owner;
247
297
  data->cond=0;                                 /* safety */
248
298
  data->type=lock_type;
249
299
  data->owner= owner;                           /* Must be reset ! */
250
 
  lock->lock();
 
300
  pthread_mutex_lock(&lock->mutex);
251
301
  if ((int) lock_type <= (int) TL_READ_NO_INSERT)
252
302
  {
253
303
    /* Request for READ lock */
263
313
      */
264
314
 
265
315
      if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
266
 
          (lock->write.data->type <= TL_WRITE_CONCURRENT_INSERT &&
267
 
           (((int) lock_type <= (int) TL_READ_WITH_SHARED_LOCKS) ||
 
316
          (lock->write.data->type <= TL_WRITE_DELAYED &&
 
317
           (((int) lock_type <= (int) TL_READ_HIGH_PRIORITY) ||
268
318
            (lock->write.data->type != TL_WRITE_CONCURRENT_INSERT &&
269
319
             lock->write.data->type != TL_WRITE_ALLOW_READ))))
270
320
      {                                         /* Already got a write lock */
273
323
        lock->read.last= &data->next;
274
324
        if (lock_type == TL_READ_NO_INSERT)
275
325
          lock->read_no_write_count++;
276
 
        current_global_counters.locks_immediate++;
 
326
        if (lock->get_status)
 
327
          (*lock->get_status)(data->status_param, 0);
 
328
        statistic_increment(locks_immediate,&THR_LOCK_lock);
277
329
        goto end;
278
330
      }
279
331
      if (lock->write.data->type == TL_WRITE_ONLY)
285
337
      }
286
338
    }
287
339
    else if (!lock->write_wait.data ||
288
 
             lock->write_wait.data->type <= TL_WRITE_DEFAULT ||
 
340
             lock->write_wait.data->type <= TL_WRITE_LOW_PRIORITY ||
 
341
             lock_type == TL_READ_HIGH_PRIORITY ||
289
342
             have_old_read_lock(lock->read.data, data->owner))
290
343
    {                                           /* No important write-locks */
291
344
      (*lock->read.last)=data;                  /* Add to running FIFO */
292
345
      data->prev=lock->read.last;
293
346
      lock->read.last= &data->next;
 
347
      if (lock->get_status)
 
348
        (*lock->get_status)(data->status_param, 0);
294
349
      if (lock_type == TL_READ_NO_INSERT)
295
350
        lock->read_no_write_count++;
296
 
      current_global_counters.locks_immediate++;
 
351
      statistic_increment(locks_immediate,&THR_LOCK_lock);
297
352
      goto end;
298
353
    }
299
354
    /*
305
360
  }
306
361
  else                                          /* Request for WRITE lock */
307
362
  {
308
 
    if (lock_type == TL_WRITE_CONCURRENT_INSERT)
 
363
    if (lock_type == TL_WRITE_DELAYED)
 
364
    {
 
365
      if (lock->write.data && lock->write.data->type == TL_WRITE_ONLY)
 
366
      {
 
367
        data->type=TL_UNLOCK;
 
368
        result= THR_LOCK_ABORTED;               /* Can't wait for this one */
 
369
        goto end;
 
370
      }
 
371
      /*
 
372
        if there is a TL_WRITE_ALLOW_READ lock, we have to wait for a lock
 
373
        (TL_WRITE_ALLOW_READ is used for ALTER TABLE in MySQL)
 
374
      */
 
375
      if ((!lock->write.data ||
 
376
           lock->write.data->type != TL_WRITE_ALLOW_READ) &&
 
377
          !have_specific_lock(lock->write_wait.data,TL_WRITE_ALLOW_READ) &&
 
378
          (lock->write.data || lock->read.data))
 
379
      {
 
380
        /* Add delayed write lock to write_wait queue, and return at once */
 
381
        (*lock->write_wait.last)=data;
 
382
        data->prev=lock->write_wait.last;
 
383
        lock->write_wait.last= &data->next;
 
384
        data->cond=get_cond();
 
385
        /*
 
386
          We don't have to do get_status here as we will do it when we change
 
387
          the delayed lock to a real write lock
 
388
        */
 
389
        statistic_increment(locks_immediate,&THR_LOCK_lock);
 
390
        goto end;
 
391
      }
 
392
    }
 
393
    else if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
309
394
      data->type=lock_type= thr_upgraded_concurrent_insert_lock;
310
395
 
311
396
    if (lock->write.data)                       /* If there is a write lock */
340
425
        (*lock->write.last)=data;       /* Add to running fifo */
341
426
        data->prev=lock->write.last;
342
427
        lock->write.last= &data->next;
343
 
        current_global_counters.locks_immediate++;
 
428
        if (data->lock->get_status)
 
429
          (*data->lock->get_status)(data->status_param, 0);
 
430
        statistic_increment(locks_immediate,&THR_LOCK_lock);
344
431
        goto end;
345
432
      }
346
433
    }
352
439
        if (lock_type == TL_WRITE_CONCURRENT_INSERT)
353
440
        {
354
441
          concurrent_insert= 1;
 
442
          if ((*lock->check_status)(data->status_param))
 
443
          {
 
444
            concurrent_insert= 0;
 
445
            data->type=lock_type= thr_upgraded_concurrent_insert_lock;
 
446
          }
355
447
        }
356
448
 
357
449
        if (!lock->read.data ||
358
 
            (lock_type <= TL_WRITE_CONCURRENT_INSERT &&
 
450
            (lock_type <= TL_WRITE_DELAYED &&
359
451
             ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
360
452
               lock_type != TL_WRITE_ALLOW_WRITE) ||
361
453
              !lock->read_no_write_count)))
363
455
          (*lock->write.last)=data;             /* Add as current write lock */
364
456
          data->prev=lock->write.last;
365
457
          lock->write.last= &data->next;
366
 
          current_global_counters.locks_immediate++;
 
458
          if (data->lock->get_status)
 
459
            (*data->lock->get_status)(data->status_param, concurrent_insert);
 
460
          statistic_increment(locks_immediate,&THR_LOCK_lock);
367
461
          goto end;
368
462
        }
369
463
      }
382
476
    result= THR_LOCK_DEADLOCK;
383
477
    goto end;
384
478
  }
385
 
 
386
479
  /* Can't get lock yet;  Wait for it */
387
 
  return(wait_for_lock(session, wait_queue, data));
 
480
  return(wait_for_lock(wait_queue, data, 0));
388
481
end:
389
 
  lock->unlock();
390
 
 
 
482
  pthread_mutex_unlock(&lock->mutex);
391
483
  return(result);
392
484
}
393
485
 
394
486
 
395
 
static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
 
487
static inline void free_all_read_locks(THR_LOCK *lock,
 
488
                                       bool using_concurrent_insert)
396
489
{
397
 
  THR_LOCK_DATA *data= lock->read_wait.data;
 
490
  THR_LOCK_DATA *data=lock->read_wait.data;
398
491
 
399
492
  /* move all locks from read_wait list to read list */
400
493
  (*lock->read.last)=data;
406
499
 
407
500
  do
408
501
  {
409
 
    boost::condition_variable_any *cond= data->cond;
 
502
    pthread_cond_t *cond=data->cond;
410
503
    if ((int) data->type == (int) TL_READ_NO_INSERT)
411
504
    {
412
505
      if (using_concurrent_insert)
426
519
      }
427
520
      lock->read_no_write_count++;
428
521
    }
429
 
    data->cond= NULL;                           /* Mark thread free */
430
 
    cond->notify_one();
 
522
    data->cond=0;                               /* Mark thread free */
 
523
    pthread_cond_signal(cond);
431
524
  } while ((data=data->next));
432
525
  *lock->read_wait.last=0;
433
526
  if (!lock->read_wait.data)
434
527
    lock->write_lock_count=0;
435
528
}
436
529
 
437
 
/* Unlock lock and free next thread on same lock */
 
530
        /* Unlock lock and free next thread on same lock */
438
531
 
439
 
static void thr_unlock(THR_LOCK_DATA *data)
 
532
void thr_unlock(THR_LOCK_DATA *data)
440
533
{
441
534
  THR_LOCK *lock=data->lock;
442
535
  enum thr_lock_type lock_type=data->type;
443
 
  lock->lock();
 
536
  pthread_mutex_lock(&lock->mutex);
444
537
 
445
538
  if (((*data->prev)=data->next))               /* remove from lock-list */
446
539
    data->next->prev= data->prev;
447
540
  else if (lock_type <= TL_READ_NO_INSERT)
448
541
    lock->read.last=data->prev;
 
542
  else if (lock_type == TL_WRITE_DELAYED && data->cond)
 
543
  {
 
544
    /*
 
545
      This only happens in extreme circumstances when a
 
546
      write delayed lock that is waiting for a lock
 
547
    */
 
548
    lock->write_wait.last=data->prev;           /* Put it on wait queue */
 
549
  }
449
550
  else
450
551
    lock->write.last=data->prev;
451
552
  if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
452
 
  { }
 
553
  {
 
554
    if (lock->update_status)
 
555
      (*lock->update_status)(data->status_param);
 
556
  }
453
557
  else
454
 
  { }
 
558
  {
 
559
    if (lock->restore_status)
 
560
      (*lock->restore_status)(data->status_param);
 
561
  }
455
562
  if (lock_type == TL_READ_NO_INSERT)
456
563
    lock->read_no_write_count--;
457
564
  data->type=TL_UNLOCK;                         /* Mark unlocked */
458
565
  wake_up_waiters(lock);
459
 
  lock->unlock();
 
566
  pthread_mutex_unlock(&lock->mutex);
 
567
  return;
460
568
}
461
569
 
462
570
 
480
588
    {
481
589
      /* Release write-locks with TL_WRITE or TL_WRITE_ONLY priority first */
482
590
      if (data &&
483
 
          (!lock->read_wait.data || lock->read_wait.data->type <= TL_READ_WITH_SHARED_LOCKS))
 
591
          (data->type != TL_WRITE_LOW_PRIORITY || !lock->read_wait.data ||
 
592
           lock->read_wait.data->type < TL_READ_HIGH_PRIORITY))
484
593
      {
485
594
        if (lock->write_lock_count++ > max_write_lock_count)
486
595
        {
502
611
          data->prev=lock->write.last;
503
612
          data->next=0;
504
613
          lock->write.last= &data->next;
505
 
 
 
614
          if (data->type == TL_WRITE_CONCURRENT_INSERT &&
 
615
              (*lock->check_status)(data->status_param))
 
616
            data->type=TL_WRITE;                        /* Upgrade lock */
506
617
          {
507
 
            boost::condition_variable_any *cond= data->cond;
508
 
            data->cond= NULL;                           /* Mark thread free */
509
 
            cond->notify_one(); /* Start waiting thred */
 
618
            pthread_cond_t *cond=data->cond;
 
619
            data->cond=0;                               /* Mark thread free */
 
620
            pthread_cond_signal(cond);  /* Start waiting thread */
510
621
          }
511
622
          if (data->type != TL_WRITE_ALLOW_WRITE ||
512
623
              !lock->write_wait.data ||
514
625
            break;
515
626
          data=lock->write_wait.data;           /* Free this too */
516
627
        }
517
 
        if (data->type >= TL_WRITE)
 
628
        if (data->type >= TL_WRITE_LOW_PRIORITY)
518
629
          goto end;
519
630
        /* Release possible read locks together with the write lock */
520
631
      }
525
636
                             data->type == TL_WRITE_ALLOW_WRITE));
526
637
    }
527
638
    else if (data &&
528
 
             (lock_type=data->type) <= TL_WRITE_CONCURRENT_INSERT &&
 
639
             (lock_type=data->type) <= TL_WRITE_DELAYED &&
529
640
             ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
530
641
               lock_type != TL_WRITE_ALLOW_WRITE) ||
531
642
              !lock->read_no_write_count))
532
643
    {
 
644
      /*
 
645
        For DELAYED, ALLOW_READ, WRITE_ALLOW_WRITE or CONCURRENT_INSERT locks
 
646
        start WRITE locks together with the READ locks
 
647
      */
 
648
      if (lock_type == TL_WRITE_CONCURRENT_INSERT &&
 
649
          (*lock->check_status)(data->status_param))
 
650
      {
 
651
        data->type=TL_WRITE;                    /* Upgrade lock */
 
652
        if (lock->read_wait.data)
 
653
          free_all_read_locks(lock,0);
 
654
        goto end;
 
655
      }
533
656
      do {
534
 
        boost::condition_variable_any *cond= data->cond;
 
657
        pthread_cond_t *cond=data->cond;
535
658
        if (((*data->prev)=data->next))         /* remove from wait-list */
536
659
          data->next->prev= data->prev;
537
660
        else
540
663
        data->prev=lock->write.last;
541
664
        lock->write.last= &data->next;
542
665
        data->next=0;                           /* Only one write lock */
543
 
        data->cond= NULL;                               /* Mark thread free */
544
 
        cond->notify_one(); /* Start waiting thread */
 
666
        data->cond=0;                           /* Mark thread free */
 
667
        pthread_cond_signal(cond);      /* Start waiting thread */
545
668
      } while (lock_type == TL_WRITE_ALLOW_WRITE &&
546
669
               (data=lock->write_wait.data) &&
547
670
               data->type == TL_WRITE_ALLOW_WRITE);
551
674
                             lock_type == TL_WRITE_ALLOW_WRITE));
552
675
    }
553
676
    else if (!data && lock->read_wait.data)
554
 
    {
555
677
      free_all_read_locks(lock,0);
556
 
    }
557
678
  }
558
679
end:
559
680
  return;
591
712
 
592
713
 
593
714
enum enum_thr_lock_result
594
 
thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
 
715
thr_multi_lock(THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
595
716
{
596
717
  THR_LOCK_DATA **pos,**end;
597
718
  if (count > 1)
599
720
  /* lock everything */
600
721
  for (pos=data,end=data+count; pos < end ; pos++)
601
722
  {
602
 
    enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
 
723
    enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
603
724
    if (result != THR_LOCK_SUCCESS)
604
725
    {                                           /* Aborted */
605
726
      thr_multi_unlock(data,(uint32_t) (pos-data));
619
740
    do
620
741
    {
621
742
      pos--;
622
 
      last_lock=(*pos);
 
743
      if (last_lock->lock == (*pos)->lock &&
 
744
          last_lock->lock->copy_status)
 
745
      {
 
746
        if (last_lock->type <= TL_READ_NO_INSERT)
 
747
        {
 
748
          THR_LOCK_DATA **read_lock;
 
749
          /*
 
750
            If we are locking the same table with read locks we must ensure
 
751
            that all tables share the status of the last write lock or
 
752
            the same read lock.
 
753
          */
 
754
          for (;
 
755
               (*pos)->type <= TL_READ_NO_INSERT &&
 
756
                 pos != data &&
 
757
                 pos[-1]->lock == (*pos)->lock ;
 
758
               pos--) ;
 
759
 
 
760
          read_lock = pos+1;
 
761
          do
 
762
          {
 
763
            (last_lock->lock->copy_status)((*read_lock)->status_param,
 
764
                                           (*pos)->status_param);
 
765
          } while (*(read_lock++) != last_lock);
 
766
          last_lock= (*pos);                    /* Point at last write lock */
 
767
        }
 
768
        else
 
769
          (*last_lock->lock->copy_status)((*pos)->status_param,
 
770
                                          last_lock->status_param);
 
771
      }
 
772
      else
 
773
        last_lock=(*pos);
623
774
    } while (pos != data);
624
775
  }
625
776
#endif
640
791
  return;
641
792
}
642
793
 
643
 
void DrizzleLock::unlock(uint32_t count)
644
 
{
645
 
  THR_LOCK_DATA **pos,**end;
646
 
 
647
 
  for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
648
 
  {
649
 
    if ((*pos)->type != TL_UNLOCK)
650
 
      thr_unlock(*pos);
651
 
  }
652
 
}
653
 
 
654
794
/*
655
795
  Abort all threads waiting for a lock. The lock will be upgraded to
656
796
  TL_WRITE_ONLY to abort any new accesses to the lock
657
797
*/
658
798
 
659
 
void THR_LOCK::abort_locks()
 
799
void thr_abort_locks(THR_LOCK *lock, bool upgrade_lock)
660
800
{
661
 
  boost_unique_lock_t scopedLock(mutex);
 
801
  THR_LOCK_DATA *data;
 
802
  pthread_mutex_lock(&lock->mutex);
662
803
 
663
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
804
  for (data=lock->read_wait.data; data ; data=data->next)
664
805
  {
665
 
    local_data->type= TL_UNLOCK;                        /* Mark killed */
 
806
    data->type=TL_UNLOCK;                       /* Mark killed */
666
807
    /* It's safe to signal the cond first: we're still holding the mutex. */
667
 
    local_data->cond->notify_one();
668
 
    local_data->cond= NULL;                             /* Removed from list */
 
808
    pthread_cond_signal(data->cond);
 
809
    data->cond=0;                               /* Removed from list */
669
810
  }
670
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
811
  for (data=lock->write_wait.data; data ; data=data->next)
671
812
  {
672
 
    local_data->type= TL_UNLOCK;
673
 
    local_data->cond->notify_one();
674
 
    local_data->cond= NULL;
 
813
    data->type=TL_UNLOCK;
 
814
    pthread_cond_signal(data->cond);
 
815
    data->cond=0;
675
816
  }
676
 
  read_wait.last= &read_wait.data;
677
 
  write_wait.last= &write_wait.data;
678
 
  read_wait.data= write_wait.data=0;
679
 
  if (write.data)
680
 
    write.data->type=TL_WRITE_ONLY;
 
817
  lock->read_wait.last= &lock->read_wait.data;
 
818
  lock->write_wait.last= &lock->write_wait.data;
 
819
  lock->read_wait.data=lock->write_wait.data=0;
 
820
  if (upgrade_lock && lock->write.data)
 
821
    lock->write.data->type=TL_WRITE_ONLY;
 
822
  pthread_mutex_unlock(&lock->mutex);
 
823
  return;
681
824
}
682
825
 
683
826
 
687
830
  This is used to abort all locks for a specific thread
688
831
*/
689
832
 
690
 
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
 
833
bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread_id)
691
834
{
 
835
  THR_LOCK_DATA *data;
692
836
  bool found= false;
693
837
 
694
 
  boost_unique_lock_t scopedLock(mutex);
695
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
838
  pthread_mutex_lock(&lock->mutex);
 
839
  for (data= lock->read_wait.data; data ; data= data->next)
696
840
  {
697
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
841
    if (data->owner->info->thread_id == thread_id)    /* purecov: tested */
698
842
    {
699
 
      local_data->type= TL_UNLOCK;                      /* Mark killed */
 
843
      data->type= TL_UNLOCK;                    /* Mark killed */
700
844
      /* It's safe to signal the cond first: we're still holding the mutex. */
701
845
      found= true;
702
 
      local_data->cond->notify_one();
703
 
      local_data->cond= 0;                              /* Removed from list */
 
846
      pthread_cond_signal(data->cond);
 
847
      data->cond= 0;                            /* Removed from list */
704
848
 
705
 
      if (((*local_data->prev)= local_data->next))
706
 
        local_data->next->prev= local_data->prev;
 
849
      if (((*data->prev)= data->next))
 
850
        data->next->prev= data->prev;
707
851
      else
708
 
        read_wait.last= local_data->prev;
 
852
        lock->read_wait.last= data->prev;
709
853
    }
710
854
  }
711
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
855
  for (data= lock->write_wait.data; data ; data= data->next)
712
856
  {
713
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
857
    if (data->owner->info->thread_id == thread_id) /* purecov: tested */
714
858
    {
715
 
      local_data->type= TL_UNLOCK;
 
859
      data->type= TL_UNLOCK;
716
860
      found= true;
717
 
      local_data->cond->notify_one();
718
 
      local_data->cond= NULL;
 
861
      pthread_cond_signal(data->cond);
 
862
      data->cond= 0;
719
863
 
720
 
      if (((*local_data->prev)= local_data->next))
721
 
        local_data->next->prev= local_data->prev;
 
864
      if (((*data->prev)= data->next))
 
865
        data->next->prev= data->prev;
722
866
      else
723
 
        write_wait.last= local_data->prev;
724
 
    }
725
 
  }
726
 
  wake_up_waiters(this);
727
 
 
728
 
  return found;
729
 
}
730
 
 
731
 
} /* namespace drizzled */
 
867
        lock->write_wait.last= data->prev;
 
868
    }
 
869
  }
 
870
  wake_up_waiters(lock);
 
871
  pthread_mutex_unlock(&lock->mutex);
 
872
  return(found);
 
873
}
 
874
 
 
875
 
 
876
/*
 
877
  Downgrade a WRITE_* to a lower WRITE level
 
878
  SYNOPSIS
 
879
    thr_downgrade_write_lock()
 
880
    in_data                   Lock data of thread downgrading its lock
 
881
    new_lock_type             New write lock type
 
882
  RETURN VALUE
 
883
    NONE
 
884
  DESCRIPTION
 
885
    This can be used to downgrade a lock already owned. When the downgrade
 
886
    occurs also other waiters, both readers and writers can be allowed to
 
887
    start.
 
888
    The previous lock is often TL_WRITE_ONLY but can also be
 
889
    TL_WRITE and TL_WRITE_ALLOW_READ. The normal downgrade variants are
 
890
    TL_WRITE_ONLY => TL_WRITE_ALLOW_READ After a short exclusive lock
 
891
    TL_WRITE_ALLOW_READ => TL_WRITE_ALLOW_WRITE After discovering that the
 
892
    operation didn't need such a high lock.
 
893
    TL_WRITE_ONLY => TL_WRITE after a short exclusive lock while holding a
 
894
    write table lock
 
895
    TL_WRITE_ONLY => TL_WRITE_ALLOW_WRITE After a short exclusive lock after
 
896
    already earlier having dongraded lock to TL_WRITE_ALLOW_WRITE
 
897
    The implementation is conservative and rather don't start rather than
 
898
    go on unknown paths to start, the common cases are handled.
 
899
 
 
900
    NOTE:
 
901
    In its current implementation it is only allowed to downgrade from
 
902
    TL_WRITE_ONLY. In this case there are no waiters. Thus no wake up
 
903
    logic is required.
 
904
*/
 
905
 
 
906
void thr_downgrade_write_lock(THR_LOCK_DATA *in_data,
 
907
                              enum thr_lock_type new_lock_type)
 
908
{
 
909
  THR_LOCK *lock=in_data->lock;
 
910
 
 
911
  pthread_mutex_lock(&lock->mutex);
 
912
  in_data->type= new_lock_type;
 
913
 
 
914
  pthread_mutex_unlock(&lock->mutex);
 
915
  return;
 
916
}
 
917
 
 
918
/* Upgrade a WRITE_DELAY lock to a WRITE_LOCK */
 
919
 
 
920
bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data)
 
921
{
 
922
  THR_LOCK *lock=data->lock;
 
923
 
 
924
  pthread_mutex_lock(&lock->mutex);
 
925
  if (data->type == TL_UNLOCK || data->type >= TL_WRITE_LOW_PRIORITY)
 
926
  {
 
927
    pthread_mutex_unlock(&lock->mutex);
 
928
    return(data->type == TL_UNLOCK);    /* Test if Aborted */
 
929
  }
 
930
  /* TODO:  Upgrade to TL_WRITE_CONCURRENT_INSERT in some cases */
 
931
  data->type=TL_WRITE;                          /* Upgrade lock */
 
932
 
 
933
  /* Check if someone has given us the lock */
 
934
  if (!data->cond)
 
935
  {
 
936
    if (!lock->read.data)                       /* No read locks */
 
937
    {                                           /* We have the lock */
 
938
      if (data->lock->get_status)
 
939
        (*data->lock->get_status)(data->status_param, 0);
 
940
      pthread_mutex_unlock(&lock->mutex);
 
941
      return(0);
 
942
    }
 
943
 
 
944
    if (((*data->prev)=data->next))             /* remove from lock-list */
 
945
      data->next->prev= data->prev;
 
946
    else
 
947
      lock->write.last=data->prev;
 
948
 
 
949
    if ((data->next=lock->write_wait.data))     /* Put first in lock_list */
 
950
      data->next->prev= &data->next;
 
951
    else
 
952
      lock->write_wait.last= &data->next;
 
953
    data->prev= &lock->write_wait.data;
 
954
    lock->write_wait.data=data;
 
955
  }
 
956
 
 
957
  return(wait_for_lock(&lock->write_wait,data,1));
 
958
}
 
959
 
 
960
 
 
961
/* downgrade a WRITE lock to a WRITE_DELAY lock if there is pending locks */
 
962
 
 
963
bool thr_reschedule_write_lock(THR_LOCK_DATA *data)
 
964
{
 
965
  THR_LOCK *lock=data->lock;
 
966
 
 
967
  pthread_mutex_lock(&lock->mutex);
 
968
  if (!lock->read_wait.data)                    /* No waiting read locks */
 
969
  {
 
970
    pthread_mutex_unlock(&lock->mutex);
 
971
    return(0);
 
972
  }
 
973
 
 
974
  data->type=TL_WRITE_DELAYED;
 
975
  if (lock->update_status)
 
976
    (*lock->update_status)(data->status_param);
 
977
  if (((*data->prev)=data->next))               /* remove from lock-list */
 
978
    data->next->prev= data->prev;
 
979
  else
 
980
    lock->write.last=data->prev;
 
981
 
 
982
  if ((data->next=lock->write_wait.data))       /* Put first in lock_list */
 
983
    data->next->prev= &data->next;
 
984
  else
 
985
    lock->write_wait.last= &data->next;
 
986
  data->prev= &lock->write_wait.data;
 
987
  data->cond=get_cond();                        /* This was zero */
 
988
  lock->write_wait.data=data;
 
989
  free_all_read_locks(lock,0);
 
990
 
 
991
  pthread_mutex_unlock(&lock->mutex);
 
992
  return(thr_upgrade_write_delay_lock(data));
 
993
}