~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to mysys/thr_lock.cc

  • Committer: Jay Pipes
  • Date: 2008-12-18 15:55:03 UTC
  • mto: This revision was merged to the branch mainline in revision 717.
  • Revision ID: jpipes@serialcoder-20081218155503-u45ygyunrdyyvquq
Fix for Bug#308457.  Gave UTF8 enclosure and escape character on LOAD DATA INFILE and changed the error message to be more descriptive

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/*
17
17
Read and write locks for Posix threads. All tread must acquire
26
26
 
27
27
TL_READ                 # Low priority read
28
28
TL_READ_WITH_SHARED_LOCKS
 
29
TL_READ_HIGH_PRIORITY   # High priority read
29
30
TL_READ_NO_INSERT       # Read without concurrent inserts
30
31
TL_WRITE_ALLOW_WRITE    # Write lock that allows other writers
31
32
TL_WRITE_ALLOW_READ     # Write lock, but allow reading
32
33
TL_WRITE_CONCURRENT_INSERT
33
34
                        # Insert that can be mixed when selects
 
35
TL_WRITE_DELAYED        # Used by delayed insert
 
36
                        # Allows lower locks to take over
 
37
TL_WRITE_LOW_PRIORITY   # Low priority write
34
38
TL_WRITE                # High priority write
35
39
TL_WRITE_ONLY           # High priority write
36
40
                        # Abort all new lock request with an error
46
50
should put a pointer to the following functions in the lock structure:
47
51
(If the pointer is zero (default), the function is not called)
48
52
 
 
53
check_status:
 
54
         Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
 
55
         we check if this function exists and returns 0.
 
56
         If not, then the lock is upgraded to TL_WRITE_LOCK
 
57
         In MyISAM this is a simple check if the insert can be done
 
58
         at the end of the datafile.
 
59
update_status:
 
60
        Before a write lock is released, this function is called.
 
61
        In MyISAM this functions updates the count and length of the datafile
 
62
get_status:
 
63
        When one gets a lock this functions is called.
 
64
        In MyISAM this stores the number of rows and size of the datafile
 
65
        for concurrent reads.
49
66
 
50
67
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
51
 
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
 
68
TL_WRITE_CONCURRENT_INSERT or one TL_WRITE_DELAYED lock at the same time as
 
69
multiple read locks.
52
70
 
53
71
*/
54
72
 
55
 
#include <config.h>
56
 
#include <drizzled/internal/my_sys.h>
57
 
#include <drizzled/internal/thread_var.h>
58
 
#include <drizzled/statistics_variables.h>
59
 
#include <drizzled/pthread_globals.h>
60
 
 
61
 
#include <drizzled/session.h>
 
73
#include "mysys_priv.h"
62
74
 
63
75
#include "thr_lock.h"
64
 
#include <drizzled/internal/m_string.h>
 
76
#include <mystrings/m_string.h>
65
77
#include <errno.h>
66
 
#include <list>
67
78
 
68
79
#if TIME_WITH_SYS_TIME
69
80
# include <sys/time.h>
78
89
 
79
90
#include <drizzled/util/test.h>
80
91
 
81
 
#include <boost/interprocess/sync/lock_options.hpp>
82
 
 
83
 
using namespace std;
84
 
 
85
 
namespace drizzled
86
 
{
87
 
 
 
92
bool thr_lock_inited=0;
 
93
uint32_t locks_immediate = 0L, locks_waited = 0L;
88
94
uint64_t table_lock_wait_timeout;
89
 
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
90
 
 
91
 
 
92
 
uint64_t max_write_lock_count= UINT64_MAX;
 
95
enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
 
96
 
 
97
 
 
98
LIST *thr_lock_thread_list;                     /* List of threads in use */
 
99
uint64_t max_write_lock_count= ~(uint64_t) 0L;
 
100
 
 
101
static inline pthread_cond_t *get_cond(void)
 
102
{
 
103
  return &my_thread_var->suspend;
 
104
}
93
105
 
94
106
/*
95
107
** For the future (now the thread specific cond is alloced by my_pthread.c)
96
108
*/
97
109
 
 
110
bool init_thr_lock()
 
111
{
 
112
  thr_lock_inited=1;
 
113
  return 0;
 
114
}
 
115
 
98
116
static inline bool
99
117
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
100
118
{
106
124
 
107
125
void thr_lock_init(THR_LOCK *lock)
108
126
{
109
 
  lock->init();
 
127
  memset(lock, 0, sizeof(*lock));
 
128
  pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
110
129
  lock->read.last= &lock->read.data;
111
130
  lock->read_wait.last= &lock->read_wait.data;
112
131
  lock->write_wait.last= &lock->write_wait.data;
113
132
  lock->write.last= &lock->write.data;
114
 
}
115
 
 
116
 
 
117
 
void THR_LOCK_INFO::init()
118
 
{
119
 
  internal::st_my_thread_var *tmp= my_thread_var;
120
 
  thread_id= tmp->id;
121
 
  n_cursors= 0;
 
133
 
 
134
  pthread_mutex_lock(&THR_LOCK_lock);           /* Add to locks in use */
 
135
  lock->list.data=(void*) lock;
 
136
  thr_lock_thread_list=list_add(thr_lock_thread_list,&lock->list);
 
137
  pthread_mutex_unlock(&THR_LOCK_lock);
 
138
  return;
 
139
}
 
140
 
 
141
 
 
142
void thr_lock_delete(THR_LOCK *lock)
 
143
{
 
144
  pthread_mutex_destroy(&lock->mutex);
 
145
  pthread_mutex_lock(&THR_LOCK_lock);
 
146
  thr_lock_thread_list=list_delete(thr_lock_thread_list,&lock->list);
 
147
  pthread_mutex_unlock(&THR_LOCK_lock);
 
148
  return;
 
149
}
 
150
 
 
151
 
 
152
void thr_lock_info_init(THR_LOCK_INFO *info)
 
153
{
 
154
  struct st_my_thread_var *tmp= my_thread_var;
 
155
  info->thread=    tmp->pthread_self;
 
156
  info->thread_id= tmp->id;
 
157
  info->n_cursors= 0;
122
158
}
123
159
 
124
160
        /* Initialize a lock instance */
125
161
 
126
 
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
 
162
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
127
163
{
128
 
  lock= lock_arg;
129
 
  type= TL_UNLOCK;
130
 
  owner= NULL;                               /* no owner yet */
131
 
  status_param= param_arg;
132
 
  cond= NULL;
 
164
  data->lock=lock;
 
165
  data->type=TL_UNLOCK;
 
166
  data->owner= 0;                               /* no owner yet */
 
167
  data->status_param=param;
 
168
  data->cond=0;
133
169
}
134
170
 
135
171
 
139
175
  for ( ; data ; data=data->next)
140
176
  {
141
177
    if (thr_lock_owner_equal(data->owner, owner))
142
 
      return true;                                      /* Already locked by thread */
143
 
  }
144
 
  return false;
145
 
}
 
178
      return 1;                                 /* Already locked by thread */
 
179
  }
 
180
  return 0;
 
181
}
 
182
 
 
183
static inline bool have_specific_lock(THR_LOCK_DATA *data,
 
184
                                         enum thr_lock_type type)
 
185
{
 
186
  for ( ; data ; data=data->next)
 
187
  {
 
188
    if (data->type == type)
 
189
      return 1;
 
190
  }
 
191
  return 0;
 
192
}
 
193
 
146
194
 
147
195
static void wake_up_waiters(THR_LOCK *lock);
148
196
 
149
197
 
150
 
static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
 
198
static enum enum_thr_lock_result
 
199
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
 
200
              bool in_wait_list)
151
201
{
152
 
  internal::st_my_thread_var *thread_var= session.getThreadVar();
153
 
 
154
 
  boost::condition_variable_any *cond= &thread_var->suspend;
 
202
  struct st_my_thread_var *thread_var= my_thread_var;
 
203
  pthread_cond_t *cond= &thread_var->suspend;
 
204
  struct timespec wait_timeout;
155
205
  enum enum_thr_lock_result result= THR_LOCK_ABORTED;
156
206
  bool can_deadlock= test(data->owner->info->n_cursors);
157
207
 
 
208
  if (!in_wait_list)
158
209
  {
159
210
    (*wait->last)=data;                         /* Wait for lock */
160
211
    data->prev= wait->last;
161
212
    wait->last= &data->next;
162
213
  }
163
214
 
164
 
  current_global_counters.locks_waited++;
 
215
  statistic_increment(locks_waited, &THR_LOCK_lock);
165
216
 
166
217
  /* Set up control struct to allow others to abort locks */
167
 
  thread_var->current_mutex= data->lock->native_handle();
168
 
  thread_var->current_cond=  &thread_var->suspend;
169
 
  data->cond= &thread_var->suspend;;
 
218
  thread_var->current_mutex= &data->lock->mutex;
 
219
  thread_var->current_cond=  cond;
 
220
  data->cond= cond;
170
221
 
171
 
  while (not thread_var->abort)
 
222
  if (can_deadlock)
 
223
    set_timespec(wait_timeout, table_lock_wait_timeout);
 
224
  while (!thread_var->abort || in_wait_list)
172
225
  {
173
 
    boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
174
 
 
175
 
    if (can_deadlock)
176
 
    {
177
 
      boost::xtime xt; 
178
 
      xtime_get(&xt, boost::TIME_UTC); 
179
 
      xt.sec += table_lock_wait_timeout; 
180
 
      if (not cond->timed_wait(scoped, xt))
181
 
      {
182
 
        result= THR_LOCK_WAIT_TIMEOUT;
183
 
        scoped.release();
184
 
        break;
185
 
      }
186
 
    }
187
 
    else
188
 
    {
189
 
      cond->wait(scoped);
190
 
    }
 
226
    int rc= (can_deadlock ?
 
227
             pthread_cond_timedwait(cond, &data->lock->mutex,
 
228
                                    &wait_timeout) :
 
229
             pthread_cond_wait(cond, &data->lock->mutex));
191
230
    /*
192
231
      We must break the wait if one of the following occurs:
193
232
      - the connection has been aborted (!thread_var->abort), but
201
240
      Order of checks below is important to not report about timeout
202
241
      if the predicate is true.
203
242
    */
204
 
    if (data->cond == NULL)
205
 
    {
206
 
      scoped.release();
207
 
      break;
208
 
    }
209
 
    scoped.release();
 
243
    if (data->cond == 0)
 
244
    {
 
245
      break;
 
246
    }
 
247
    if (rc == ETIMEDOUT || rc == ETIME)
 
248
    {
 
249
      /* purecov: begin inspected */
 
250
      result= THR_LOCK_WAIT_TIMEOUT;
 
251
      break;
 
252
      /* purecov: end */
 
253
    }
210
254
  }
211
255
  if (data->cond || data->type == TL_UNLOCK)
212
256
  {
223
267
  else
224
268
  {
225
269
    result= THR_LOCK_SUCCESS;
 
270
    if (data->lock->get_status)
 
271
      (*data->lock->get_status)(data->status_param, 0);
226
272
  }
227
 
  data->lock->unlock();
 
273
  pthread_mutex_unlock(&data->lock->mutex);
228
274
 
229
275
  /* The following must be done after unlock of lock->mutex */
230
 
  boost_unique_lock_t scopedLock(thread_var->mutex);
231
 
  thread_var->current_mutex= NULL;
232
 
  thread_var->current_cond= NULL;
 
276
  pthread_mutex_lock(&thread_var->mutex);
 
277
  thread_var->current_mutex= 0;
 
278
  thread_var->current_cond=  0;
 
279
  pthread_mutex_unlock(&thread_var->mutex);
233
280
  return(result);
234
281
}
235
282
 
236
283
 
237
 
static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
 
284
enum enum_thr_lock_result
 
285
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
 
286
         enum thr_lock_type lock_type)
238
287
{
239
 
  THR_LOCK *lock= data->lock;
 
288
  THR_LOCK *lock=data->lock;
240
289
  enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
241
290
  struct st_lock_list *wait_queue;
242
291
  THR_LOCK_DATA *lock_owner;
245
294
  data->cond=0;                                 /* safety */
246
295
  data->type=lock_type;
247
296
  data->owner= owner;                           /* Must be reset ! */
248
 
  lock->lock();
 
297
  pthread_mutex_lock(&lock->mutex);
249
298
  if ((int) lock_type <= (int) TL_READ_NO_INSERT)
250
299
  {
251
300
    /* Request for READ lock */
261
310
      */
262
311
 
263
312
      if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
264
 
          (lock->write.data->type <= TL_WRITE_CONCURRENT_INSERT &&
265
 
           (((int) lock_type <= (int) TL_READ_WITH_SHARED_LOCKS) ||
 
313
          (lock->write.data->type <= TL_WRITE_DELAYED &&
 
314
           (((int) lock_type <= (int) TL_READ_HIGH_PRIORITY) ||
266
315
            (lock->write.data->type != TL_WRITE_CONCURRENT_INSERT &&
267
316
             lock->write.data->type != TL_WRITE_ALLOW_READ))))
268
317
      {                                         /* Already got a write lock */
271
320
        lock->read.last= &data->next;
272
321
        if (lock_type == TL_READ_NO_INSERT)
273
322
          lock->read_no_write_count++;
274
 
        current_global_counters.locks_immediate++;
 
323
        if (lock->get_status)
 
324
          (*lock->get_status)(data->status_param, 0);
 
325
        statistic_increment(locks_immediate,&THR_LOCK_lock);
275
326
        goto end;
276
327
      }
277
328
      if (lock->write.data->type == TL_WRITE_ONLY)
283
334
      }
284
335
    }
285
336
    else if (!lock->write_wait.data ||
286
 
             lock->write_wait.data->type <= TL_WRITE_DEFAULT ||
 
337
             lock->write_wait.data->type <= TL_WRITE_LOW_PRIORITY ||
 
338
             lock_type == TL_READ_HIGH_PRIORITY ||
287
339
             have_old_read_lock(lock->read.data, data->owner))
288
340
    {                                           /* No important write-locks */
289
341
      (*lock->read.last)=data;                  /* Add to running FIFO */
290
342
      data->prev=lock->read.last;
291
343
      lock->read.last= &data->next;
 
344
      if (lock->get_status)
 
345
        (*lock->get_status)(data->status_param, 0);
292
346
      if (lock_type == TL_READ_NO_INSERT)
293
347
        lock->read_no_write_count++;
294
 
      current_global_counters.locks_immediate++;
 
348
      statistic_increment(locks_immediate,&THR_LOCK_lock);
295
349
      goto end;
296
350
    }
297
351
    /*
303
357
  }
304
358
  else                                          /* Request for WRITE lock */
305
359
  {
306
 
    if (lock_type == TL_WRITE_CONCURRENT_INSERT)
 
360
    if (lock_type == TL_WRITE_DELAYED)
 
361
    {
 
362
      if (lock->write.data && lock->write.data->type == TL_WRITE_ONLY)
 
363
      {
 
364
        data->type=TL_UNLOCK;
 
365
        result= THR_LOCK_ABORTED;               /* Can't wait for this one */
 
366
        goto end;
 
367
      }
 
368
      /*
 
369
        if there is a TL_WRITE_ALLOW_READ lock, we have to wait for a lock
 
370
        (TL_WRITE_ALLOW_READ is used for ALTER TABLE in MySQL)
 
371
      */
 
372
      if ((!lock->write.data ||
 
373
           lock->write.data->type != TL_WRITE_ALLOW_READ) &&
 
374
          !have_specific_lock(lock->write_wait.data,TL_WRITE_ALLOW_READ) &&
 
375
          (lock->write.data || lock->read.data))
 
376
      {
 
377
        /* Add delayed write lock to write_wait queue, and return at once */
 
378
        (*lock->write_wait.last)=data;
 
379
        data->prev=lock->write_wait.last;
 
380
        lock->write_wait.last= &data->next;
 
381
        data->cond=get_cond();
 
382
        /*
 
383
          We don't have to do get_status here as we will do it when we change
 
384
          the delayed lock to a real write lock
 
385
        */
 
386
        statistic_increment(locks_immediate,&THR_LOCK_lock);
 
387
        goto end;
 
388
      }
 
389
    }
 
390
    else if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
307
391
      data->type=lock_type= thr_upgraded_concurrent_insert_lock;
308
392
 
309
393
    if (lock->write.data)                       /* If there is a write lock */
338
422
        (*lock->write.last)=data;       /* Add to running fifo */
339
423
        data->prev=lock->write.last;
340
424
        lock->write.last= &data->next;
341
 
        current_global_counters.locks_immediate++;
 
425
        if (data->lock->get_status)
 
426
          (*data->lock->get_status)(data->status_param, 0);
 
427
        statistic_increment(locks_immediate,&THR_LOCK_lock);
342
428
        goto end;
343
429
      }
344
430
    }
350
436
        if (lock_type == TL_WRITE_CONCURRENT_INSERT)
351
437
        {
352
438
          concurrent_insert= 1;
 
439
          if ((*lock->check_status)(data->status_param))
 
440
          {
 
441
            concurrent_insert= 0;
 
442
            data->type=lock_type= thr_upgraded_concurrent_insert_lock;
 
443
          }
353
444
        }
354
445
 
355
446
        if (!lock->read.data ||
356
 
            (lock_type <= TL_WRITE_CONCURRENT_INSERT &&
 
447
            (lock_type <= TL_WRITE_DELAYED &&
357
448
             ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
358
449
               lock_type != TL_WRITE_ALLOW_WRITE) ||
359
450
              !lock->read_no_write_count)))
361
452
          (*lock->write.last)=data;             /* Add as current write lock */
362
453
          data->prev=lock->write.last;
363
454
          lock->write.last= &data->next;
364
 
          current_global_counters.locks_immediate++;
 
455
          if (data->lock->get_status)
 
456
            (*data->lock->get_status)(data->status_param, concurrent_insert);
 
457
          statistic_increment(locks_immediate,&THR_LOCK_lock);
365
458
          goto end;
366
459
        }
367
460
      }
380
473
    result= THR_LOCK_DEADLOCK;
381
474
    goto end;
382
475
  }
383
 
 
384
476
  /* Can't get lock yet;  Wait for it */
385
 
  return(wait_for_lock(session, wait_queue, data));
 
477
  return(wait_for_lock(wait_queue, data, 0));
386
478
end:
387
 
  lock->unlock();
388
 
 
 
479
  pthread_mutex_unlock(&lock->mutex);
389
480
  return(result);
390
481
}
391
482
 
392
483
 
393
 
static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
 
484
static inline void free_all_read_locks(THR_LOCK *lock,
 
485
                                       bool using_concurrent_insert)
394
486
{
395
 
  THR_LOCK_DATA *data= lock->read_wait.data;
 
487
  THR_LOCK_DATA *data=lock->read_wait.data;
396
488
 
397
489
  /* move all locks from read_wait list to read list */
398
490
  (*lock->read.last)=data;
404
496
 
405
497
  do
406
498
  {
407
 
    boost::condition_variable_any *cond= data->cond;
 
499
    pthread_cond_t *cond=data->cond;
408
500
    if ((int) data->type == (int) TL_READ_NO_INSERT)
409
501
    {
410
502
      if (using_concurrent_insert)
424
516
      }
425
517
      lock->read_no_write_count++;
426
518
    }
427
 
    data->cond= NULL;                           /* Mark thread free */
428
 
    cond->notify_one();
 
519
    data->cond=0;                               /* Mark thread free */
 
520
    pthread_cond_signal(cond);
429
521
  } while ((data=data->next));
430
522
  *lock->read_wait.last=0;
431
523
  if (!lock->read_wait.data)
432
524
    lock->write_lock_count=0;
433
525
}
434
526
 
435
 
/* Unlock lock and free next thread on same lock */
 
527
        /* Unlock lock and free next thread on same lock */
436
528
 
437
 
static void thr_unlock(THR_LOCK_DATA *data)
 
529
void thr_unlock(THR_LOCK_DATA *data)
438
530
{
439
531
  THR_LOCK *lock=data->lock;
440
532
  enum thr_lock_type lock_type=data->type;
441
 
  lock->lock();
 
533
  pthread_mutex_lock(&lock->mutex);
442
534
 
443
535
  if (((*data->prev)=data->next))               /* remove from lock-list */
444
536
    data->next->prev= data->prev;
445
537
  else if (lock_type <= TL_READ_NO_INSERT)
446
538
    lock->read.last=data->prev;
 
539
  else if (lock_type == TL_WRITE_DELAYED && data->cond)
 
540
  {
 
541
    /*
 
542
      This only happens in extreme circumstances when a
 
543
      write delayed lock that is waiting for a lock
 
544
    */
 
545
    lock->write_wait.last=data->prev;           /* Put it on wait queue */
 
546
  }
447
547
  else
448
548
    lock->write.last=data->prev;
449
549
  if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
450
 
  { }
 
550
  {
 
551
    if (lock->update_status)
 
552
      (*lock->update_status)(data->status_param);
 
553
  }
451
554
  else
452
 
  { }
 
555
  {
 
556
    if (lock->restore_status)
 
557
      (*lock->restore_status)(data->status_param);
 
558
  }
453
559
  if (lock_type == TL_READ_NO_INSERT)
454
560
    lock->read_no_write_count--;
455
561
  data->type=TL_UNLOCK;                         /* Mark unlocked */
456
562
  wake_up_waiters(lock);
457
 
  lock->unlock();
 
563
  pthread_mutex_unlock(&lock->mutex);
 
564
  return;
458
565
}
459
566
 
460
567
 
478
585
    {
479
586
      /* Release write-locks with TL_WRITE or TL_WRITE_ONLY priority first */
480
587
      if (data &&
481
 
          (!lock->read_wait.data || lock->read_wait.data->type <= TL_READ_WITH_SHARED_LOCKS))
 
588
          (data->type != TL_WRITE_LOW_PRIORITY || !lock->read_wait.data ||
 
589
           lock->read_wait.data->type < TL_READ_HIGH_PRIORITY))
482
590
      {
483
591
        if (lock->write_lock_count++ > max_write_lock_count)
484
592
        {
500
608
          data->prev=lock->write.last;
501
609
          data->next=0;
502
610
          lock->write.last= &data->next;
503
 
 
 
611
          if (data->type == TL_WRITE_CONCURRENT_INSERT &&
 
612
              (*lock->check_status)(data->status_param))
 
613
            data->type=TL_WRITE;                        /* Upgrade lock */
504
614
          {
505
 
            boost::condition_variable_any *cond= data->cond;
506
 
            data->cond= NULL;                           /* Mark thread free */
507
 
            cond->notify_one(); /* Start waiting thred */
 
615
            pthread_cond_t *cond=data->cond;
 
616
            data->cond=0;                               /* Mark thread free */
 
617
            pthread_cond_signal(cond);  /* Start waiting thread */
508
618
          }
509
619
          if (data->type != TL_WRITE_ALLOW_WRITE ||
510
620
              !lock->write_wait.data ||
512
622
            break;
513
623
          data=lock->write_wait.data;           /* Free this too */
514
624
        }
515
 
        if (data->type >= TL_WRITE)
 
625
        if (data->type >= TL_WRITE_LOW_PRIORITY)
516
626
          goto end;
517
627
        /* Release possible read locks together with the write lock */
518
628
      }
523
633
                             data->type == TL_WRITE_ALLOW_WRITE));
524
634
    }
525
635
    else if (data &&
526
 
             (lock_type=data->type) <= TL_WRITE_CONCURRENT_INSERT &&
 
636
             (lock_type=data->type) <= TL_WRITE_DELAYED &&
527
637
             ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
528
638
               lock_type != TL_WRITE_ALLOW_WRITE) ||
529
639
              !lock->read_no_write_count))
530
640
    {
 
641
      /*
 
642
        For DELAYED, ALLOW_READ, WRITE_ALLOW_WRITE or CONCURRENT_INSERT locks
 
643
        start WRITE locks together with the READ locks
 
644
      */
 
645
      if (lock_type == TL_WRITE_CONCURRENT_INSERT &&
 
646
          (*lock->check_status)(data->status_param))
 
647
      {
 
648
        data->type=TL_WRITE;                    /* Upgrade lock */
 
649
        if (lock->read_wait.data)
 
650
          free_all_read_locks(lock,0);
 
651
        goto end;
 
652
      }
531
653
      do {
532
 
        boost::condition_variable_any *cond= data->cond;
 
654
        pthread_cond_t *cond=data->cond;
533
655
        if (((*data->prev)=data->next))         /* remove from wait-list */
534
656
          data->next->prev= data->prev;
535
657
        else
538
660
        data->prev=lock->write.last;
539
661
        lock->write.last= &data->next;
540
662
        data->next=0;                           /* Only one write lock */
541
 
        data->cond= NULL;                               /* Mark thread free */
542
 
        cond->notify_one(); /* Start waiting thread */
 
663
        data->cond=0;                           /* Mark thread free */
 
664
        pthread_cond_signal(cond);      /* Start waiting thread */
543
665
      } while (lock_type == TL_WRITE_ALLOW_WRITE &&
544
666
               (data=lock->write_wait.data) &&
545
667
               data->type == TL_WRITE_ALLOW_WRITE);
549
671
                             lock_type == TL_WRITE_ALLOW_WRITE));
550
672
    }
551
673
    else if (!data && lock->read_wait.data)
552
 
    {
553
674
      free_all_read_locks(lock,0);
554
 
    }
555
675
  }
556
676
end:
557
677
  return;
565
685
*/
566
686
 
567
687
 
568
 
#define LOCK_CMP(A,B) ((unsigned char*) (A->lock) - (uint32_t) ((A)->type) < (unsigned char*) (B->lock)- (uint32_t) ((B)->type))
 
688
#define LOCK_CMP(A,B) ((unsigned char*) (A->lock) - (uint) ((A)->type) < (unsigned char*) (B->lock)- (uint) ((B)->type))
569
689
 
570
690
static void sort_locks(THR_LOCK_DATA **data,uint32_t count)
571
691
{
589
709
 
590
710
 
591
711
enum enum_thr_lock_result
592
 
thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
 
712
thr_multi_lock(THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
593
713
{
594
714
  THR_LOCK_DATA **pos,**end;
595
715
  if (count > 1)
597
717
  /* lock everything */
598
718
  for (pos=data,end=data+count; pos < end ; pos++)
599
719
  {
600
 
    enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
 
720
    enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type);
601
721
    if (result != THR_LOCK_SUCCESS)
602
722
    {                                           /* Aborted */
603
 
      thr_multi_unlock(data,(uint32_t) (pos-data));
 
723
      thr_multi_unlock(data,(uint) (pos-data));
604
724
      return(result);
605
725
    }
606
726
  }
617
737
    do
618
738
    {
619
739
      pos--;
620
 
      last_lock=(*pos);
 
740
      if (last_lock->lock == (*pos)->lock &&
 
741
          last_lock->lock->copy_status)
 
742
      {
 
743
        if (last_lock->type <= TL_READ_NO_INSERT)
 
744
        {
 
745
          THR_LOCK_DATA **read_lock;
 
746
          /*
 
747
            If we are locking the same table with read locks we must ensure
 
748
            that all tables share the status of the last write lock or
 
749
            the same read lock.
 
750
          */
 
751
          for (;
 
752
               (*pos)->type <= TL_READ_NO_INSERT &&
 
753
                 pos != data &&
 
754
                 pos[-1]->lock == (*pos)->lock ;
 
755
               pos--) ;
 
756
 
 
757
          read_lock = pos+1;
 
758
          do
 
759
          {
 
760
            (last_lock->lock->copy_status)((*read_lock)->status_param,
 
761
                                           (*pos)->status_param);
 
762
          } while (*(read_lock++) != last_lock);
 
763
          last_lock= (*pos);                    /* Point at last write lock */
 
764
        }
 
765
        else
 
766
          (*last_lock->lock->copy_status)((*pos)->status_param,
 
767
                                          last_lock->status_param);
 
768
      }
 
769
      else
 
770
        last_lock=(*pos);
621
771
    } while (pos != data);
622
772
  }
623
773
#endif
638
788
  return;
639
789
}
640
790
 
641
 
void DrizzleLock::unlock(uint32_t count)
642
 
{
643
 
  THR_LOCK_DATA **pos,**end;
644
 
 
645
 
  for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
646
 
  {
647
 
    if ((*pos)->type != TL_UNLOCK)
648
 
      thr_unlock(*pos);
649
 
  }
650
 
}
651
 
 
652
791
/*
653
792
  Abort all threads waiting for a lock. The lock will be upgraded to
654
793
  TL_WRITE_ONLY to abort any new accesses to the lock
655
794
*/
656
795
 
657
 
void THR_LOCK::abort_locks()
 
796
void thr_abort_locks(THR_LOCK *lock, bool upgrade_lock)
658
797
{
659
 
  boost_unique_lock_t scopedLock(mutex);
 
798
  THR_LOCK_DATA *data;
 
799
  pthread_mutex_lock(&lock->mutex);
660
800
 
661
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
801
  for (data=lock->read_wait.data; data ; data=data->next)
662
802
  {
663
 
    local_data->type= TL_UNLOCK;                        /* Mark killed */
 
803
    data->type=TL_UNLOCK;                       /* Mark killed */
664
804
    /* It's safe to signal the cond first: we're still holding the mutex. */
665
 
    local_data->cond->notify_one();
666
 
    local_data->cond= NULL;                             /* Removed from list */
 
805
    pthread_cond_signal(data->cond);
 
806
    data->cond=0;                               /* Removed from list */
667
807
  }
668
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
808
  for (data=lock->write_wait.data; data ; data=data->next)
669
809
  {
670
 
    local_data->type= TL_UNLOCK;
671
 
    local_data->cond->notify_one();
672
 
    local_data->cond= NULL;
 
810
    data->type=TL_UNLOCK;
 
811
    pthread_cond_signal(data->cond);
 
812
    data->cond=0;
673
813
  }
674
 
  read_wait.last= &read_wait.data;
675
 
  write_wait.last= &write_wait.data;
676
 
  read_wait.data= write_wait.data=0;
677
 
  if (write.data)
678
 
    write.data->type=TL_WRITE_ONLY;
 
814
  lock->read_wait.last= &lock->read_wait.data;
 
815
  lock->write_wait.last= &lock->write_wait.data;
 
816
  lock->read_wait.data=lock->write_wait.data=0;
 
817
  if (upgrade_lock && lock->write.data)
 
818
    lock->write.data->type=TL_WRITE_ONLY;
 
819
  pthread_mutex_unlock(&lock->mutex);
 
820
  return;
679
821
}
680
822
 
681
823
 
685
827
  This is used to abort all locks for a specific thread
686
828
*/
687
829
 
688
 
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
 
830
bool thr_abort_locks_for_thread(THR_LOCK *lock, my_thread_id thread_id)
689
831
{
 
832
  THR_LOCK_DATA *data;
690
833
  bool found= false;
691
834
 
692
 
  boost_unique_lock_t scopedLock(mutex);
693
 
  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
 
835
  pthread_mutex_lock(&lock->mutex);
 
836
  for (data= lock->read_wait.data; data ; data= data->next)
694
837
  {
695
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
838
    if (data->owner->info->thread_id == thread_id)    /* purecov: tested */
696
839
    {
697
 
      local_data->type= TL_UNLOCK;                      /* Mark killed */
 
840
      data->type= TL_UNLOCK;                    /* Mark killed */
698
841
      /* It's safe to signal the cond first: we're still holding the mutex. */
699
842
      found= true;
700
 
      local_data->cond->notify_one();
701
 
      local_data->cond= 0;                              /* Removed from list */
 
843
      pthread_cond_signal(data->cond);
 
844
      data->cond= 0;                            /* Removed from list */
702
845
 
703
 
      if (((*local_data->prev)= local_data->next))
704
 
        local_data->next->prev= local_data->prev;
 
846
      if (((*data->prev)= data->next))
 
847
        data->next->prev= data->prev;
705
848
      else
706
 
        read_wait.last= local_data->prev;
 
849
        lock->read_wait.last= data->prev;
707
850
    }
708
851
  }
709
 
  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
 
852
  for (data= lock->write_wait.data; data ; data= data->next)
710
853
  {
711
 
    if (local_data->owner->info->thread_id == thread_id_arg)
 
854
    if (data->owner->info->thread_id == thread_id) /* purecov: tested */
712
855
    {
713
 
      local_data->type= TL_UNLOCK;
 
856
      data->type= TL_UNLOCK;
714
857
      found= true;
715
 
      local_data->cond->notify_one();
716
 
      local_data->cond= NULL;
 
858
      pthread_cond_signal(data->cond);
 
859
      data->cond= 0;
717
860
 
718
 
      if (((*local_data->prev)= local_data->next))
719
 
        local_data->next->prev= local_data->prev;
 
861
      if (((*data->prev)= data->next))
 
862
        data->next->prev= data->prev;
720
863
      else
721
 
        write_wait.last= local_data->prev;
722
 
    }
723
 
  }
724
 
  wake_up_waiters(this);
725
 
 
726
 
  return found;
727
 
}
728
 
 
729
 
} /* namespace drizzled */
 
864
        lock->write_wait.last= data->prev;
 
865
    }
 
866
  }
 
867
  wake_up_waiters(lock);
 
868
  pthread_mutex_unlock(&lock->mutex);
 
869
  return(found);
 
870
}
 
871
 
 
872
 
 
873
/*
 
874
  Downgrade a WRITE_* to a lower WRITE level
 
875
  SYNOPSIS
 
876
    thr_downgrade_write_lock()
 
877
    in_data                   Lock data of thread downgrading its lock
 
878
    new_lock_type             New write lock type
 
879
  RETURN VALUE
 
880
    NONE
 
881
  DESCRIPTION
 
882
    This can be used to downgrade a lock already owned. When the downgrade
 
883
    occurs also other waiters, both readers and writers can be allowed to
 
884
    start.
 
885
    The previous lock is often TL_WRITE_ONLY but can also be
 
886
    TL_WRITE and TL_WRITE_ALLOW_READ. The normal downgrade variants are
 
887
    TL_WRITE_ONLY => TL_WRITE_ALLOW_READ After a short exclusive lock
 
888
    TL_WRITE_ALLOW_READ => TL_WRITE_ALLOW_WRITE After discovering that the
 
889
    operation didn't need such a high lock.
 
890
    TL_WRITE_ONLY => TL_WRITE after a short exclusive lock while holding a
 
891
    write table lock
 
892
    TL_WRITE_ONLY => TL_WRITE_ALLOW_WRITE After a short exclusive lock after
 
893
    already earlier having dongraded lock to TL_WRITE_ALLOW_WRITE
 
894
    The implementation is conservative and rather don't start rather than
 
895
    go on unknown paths to start, the common cases are handled.
 
896
 
 
897
    NOTE:
 
898
    In its current implementation it is only allowed to downgrade from
 
899
    TL_WRITE_ONLY. In this case there are no waiters. Thus no wake up
 
900
    logic is required.
 
901
*/
 
902
 
 
903
void thr_downgrade_write_lock(THR_LOCK_DATA *in_data,
 
904
                              enum thr_lock_type new_lock_type)
 
905
{
 
906
  THR_LOCK *lock=in_data->lock;
 
907
 
 
908
  pthread_mutex_lock(&lock->mutex);
 
909
  in_data->type= new_lock_type;
 
910
 
 
911
  pthread_mutex_unlock(&lock->mutex);
 
912
  return;
 
913
}
 
914
 
 
915
/* Upgrade a WRITE_DELAY lock to a WRITE_LOCK */
 
916
 
 
917
bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data)
 
918
{
 
919
  THR_LOCK *lock=data->lock;
 
920
 
 
921
  pthread_mutex_lock(&lock->mutex);
 
922
  if (data->type == TL_UNLOCK || data->type >= TL_WRITE_LOW_PRIORITY)
 
923
  {
 
924
    pthread_mutex_unlock(&lock->mutex);
 
925
    return(data->type == TL_UNLOCK);    /* Test if Aborted */
 
926
  }
 
927
  /* TODO:  Upgrade to TL_WRITE_CONCURRENT_INSERT in some cases */
 
928
  data->type=TL_WRITE;                          /* Upgrade lock */
 
929
 
 
930
  /* Check if someone has given us the lock */
 
931
  if (!data->cond)
 
932
  {
 
933
    if (!lock->read.data)                       /* No read locks */
 
934
    {                                           /* We have the lock */
 
935
      if (data->lock->get_status)
 
936
        (*data->lock->get_status)(data->status_param, 0);
 
937
      pthread_mutex_unlock(&lock->mutex);
 
938
      return(0);
 
939
    }
 
940
 
 
941
    if (((*data->prev)=data->next))             /* remove from lock-list */
 
942
      data->next->prev= data->prev;
 
943
    else
 
944
      lock->write.last=data->prev;
 
945
 
 
946
    if ((data->next=lock->write_wait.data))     /* Put first in lock_list */
 
947
      data->next->prev= &data->next;
 
948
    else
 
949
      lock->write_wait.last= &data->next;
 
950
    data->prev= &lock->write_wait.data;
 
951
    lock->write_wait.data=data;
 
952
  }
 
953
 
 
954
  return(wait_for_lock(&lock->write_wait,data,1));
 
955
}
 
956
 
 
957
 
 
958
/* downgrade a WRITE lock to a WRITE_DELAY lock if there is pending locks */
 
959
 
 
960
bool thr_reschedule_write_lock(THR_LOCK_DATA *data)
 
961
{
 
962
  THR_LOCK *lock=data->lock;
 
963
 
 
964
  pthread_mutex_lock(&lock->mutex);
 
965
  if (!lock->read_wait.data)                    /* No waiting read locks */
 
966
  {
 
967
    pthread_mutex_unlock(&lock->mutex);
 
968
    return(0);
 
969
  }
 
970
 
 
971
  data->type=TL_WRITE_DELAYED;
 
972
  if (lock->update_status)
 
973
    (*lock->update_status)(data->status_param);
 
974
  if (((*data->prev)=data->next))               /* remove from lock-list */
 
975
    data->next->prev= data->prev;
 
976
  else
 
977
    lock->write.last=data->prev;
 
978
 
 
979
  if ((data->next=lock->write_wait.data))       /* Put first in lock_list */
 
980
    data->next->prev= &data->next;
 
981
  else
 
982
    lock->write_wait.last= &data->next;
 
983
  data->prev= &lock->write_wait.data;
 
984
  data->cond=get_cond();                        /* This was zero */
 
985
  lock->write_wait.data=data;
 
986
  free_all_read_locks(lock,0);
 
987
 
 
988
  pthread_mutex_unlock(&lock->mutex);
 
989
  return(thr_upgrade_write_delay_lock(data));
 
990
}