~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to storage/innobase/sync/sync0arr.c

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/******************************************************
 
2
The wait array used in synchronization primitives
 
3
 
 
4
(c) 1995 Innobase Oy
 
5
 
 
6
Created 9/5/1995 Heikki Tuuri
 
7
*******************************************************/
 
8
 
 
9
#include "sync0arr.h"
 
10
#ifdef UNIV_NONINL
 
11
#include "sync0arr.ic"
 
12
#endif
 
13
 
 
14
#include "sync0sync.h"
 
15
#include "sync0rw.h"
 
16
#include "os0sync.h"
 
17
#include "os0file.h"
 
18
#include "srv0srv.h"
 
19
 
 
20
/*
 
21
                        WAIT ARRAY
 
22
                        ==========
 
23
 
 
24
The wait array consists of cells each of which has an
 
25
an operating system event object created for it. The threads
 
26
waiting for a mutex, for example, can reserve a cell
 
27
in the array and suspend themselves to wait for the event
 
28
to become signaled. When using the wait array, remember to make
 
29
sure that some thread holding the synchronization object
 
30
will eventually know that there is a waiter in the array and
 
31
signal the object, to prevent infinite wait.
 
32
Why we chose to implement a wait array? First, to make
 
33
mutexes fast, we had to code our own implementation of them,
 
34
which only in usually uncommon cases resorts to using
 
35
slow operating system primitives. Then we had the choice of
 
36
assigning a unique OS event for each mutex, which would
 
37
be simpler, or using a global wait array. In some operating systems,
 
38
the global wait array solution is more efficient and flexible,
 
39
because we can do with a very small number of OS events,
 
40
say 200. In NT 3.51, allocating events seems to be a quadratic
 
41
algorithm, because 10 000 events are created fast, but
 
42
100 000 events takes a couple of minutes to create.
 
43
*/
 
44
 
 
45
/* A cell where an individual thread may wait suspended
 
46
until a resource is released. The suspending is implemented
 
47
using an operating system event semaphore. */
 
48
struct sync_cell_struct {
 
49
        /* State of the cell. SC_WAKING_UP means
 
50
        sync_array_struct->n_reserved has been decremented, but the thread
 
51
        in this cell has not waken up yet. When it does, it will set the
 
52
        state to SC_FREE. Note that this is done without the protection of
 
53
        any mutex. */
 
54
        enum { SC_FREE, SC_RESERVED, SC_WAKING_UP } state;
 
55
 
 
56
        void*           wait_object;    /* pointer to the object the
 
57
                                        thread is waiting for; this is not
 
58
                                        reseted to NULL when a cell is
 
59
                                        freed. */
 
60
 
 
61
        mutex_t*        old_wait_mutex; /* the latest wait mutex in cell */
 
62
        rw_lock_t*      old_wait_rw_lock;/* the latest wait rw-lock in cell */
 
63
        ulint           request_type;   /* lock type requested on the
 
64
                                        object */
 
65
        const char*     file;           /* in debug version file where
 
66
                                        requested */
 
67
        ulint           line;           /* in debug version line where
 
68
                                        requested */
 
69
        os_thread_id_t  thread;         /* thread id of this waiting
 
70
                                        thread */
 
71
        ibool           waiting;        /* TRUE if the thread has already
 
72
                                        called sync_array_event_wait
 
73
                                        on this cell */
 
74
        ibool           event_set;      /* TRUE if the event is set */
 
75
        os_event_t      event;          /* operating system event
 
76
                                        semaphore handle */
 
77
        time_t          reservation_time;/* time when the thread reserved
 
78
                                        the wait cell */
 
79
};
 
80
 
 
81
struct sync_array_struct {
 
82
        ulint           n_reserved;     /* number of currently reserved
 
83
                                        cells in the wait array */
 
84
        ulint           n_cells;        /* number of cells in the
 
85
                                        wait array */
 
86
        sync_cell_t*    array;          /* pointer to wait array */
 
87
        ulint           protection;     /* this flag tells which
 
88
                                        mutex protects the data */
 
89
        mutex_t         mutex;          /* possible database mutex
 
90
                                        protecting this data structure */
 
91
        os_mutex_t      os_mutex;       /* Possible operating system mutex
 
92
                                        protecting the data structure.
 
93
                                        As this data structure is used in
 
94
                                        constructing the database mutex,
 
95
                                        to prevent infinite recursion
 
96
                                        in implementation, we fall back to
 
97
                                        an OS mutex. */
 
98
        ulint           sg_count;       /* count of how many times an
 
99
                                        object has been signalled */
 
100
        ulint           res_count;      /* count of cell reservations
 
101
                                        since creation of the array */
 
102
};
 
103
 
 
104
#ifdef UNIV_SYNC_DEBUG
 
105
/**********************************************************************
 
106
This function is called only in the debug version. Detects a deadlock
 
107
of one or more threads because of waits of semaphores. */
 
108
static
 
109
ibool
 
110
sync_array_detect_deadlock(
 
111
/*=======================*/
 
112
                                /* out: TRUE if deadlock detected */
 
113
        sync_array_t*   arr,    /* in: wait array; NOTE! the caller must
 
114
                                own the mutex to array */
 
115
        sync_cell_t*    start,  /* in: cell where recursive search started */
 
116
        sync_cell_t*    cell,   /* in: cell to search */
 
117
        ulint           depth); /* in: recursion depth */
 
118
#endif /* UNIV_SYNC_DEBUG */
 
119
 
 
120
/*********************************************************************
 
121
Gets the nth cell in array. */
 
122
static
 
123
sync_cell_t*
 
124
sync_array_get_nth_cell(
 
125
/*====================*/
 
126
                                /* out: cell */
 
127
        sync_array_t*   arr,    /* in: sync array */
 
128
        ulint           n)      /* in: index */
 
129
{
 
130
        ut_a(arr);
 
131
        ut_a(n < arr->n_cells);
 
132
 
 
133
        return(arr->array + n);
 
134
}
 
135
 
 
136
/**********************************************************************
 
137
Reserves the mutex semaphore protecting a sync array. */
 
138
static
 
139
void
 
140
sync_array_enter(
 
141
/*=============*/
 
142
        sync_array_t*   arr)    /* in: sync wait array */
 
143
{
 
144
        ulint   protection;
 
145
 
 
146
        protection = arr->protection;
 
147
 
 
148
        if (protection == SYNC_ARRAY_OS_MUTEX) {
 
149
                os_mutex_enter(arr->os_mutex);
 
150
        } else if (protection == SYNC_ARRAY_MUTEX) {
 
151
                mutex_enter(&(arr->mutex));
 
152
        } else {
 
153
                ut_error;
 
154
        }
 
155
}
 
156
 
 
157
/**********************************************************************
 
158
Releases the mutex semaphore protecting a sync array. */
 
159
static
 
160
void
 
161
sync_array_exit(
 
162
/*============*/
 
163
        sync_array_t*   arr)    /* in: sync wait array */
 
164
{
 
165
        ulint   protection;
 
166
 
 
167
        protection = arr->protection;
 
168
 
 
169
        if (protection == SYNC_ARRAY_OS_MUTEX) {
 
170
                os_mutex_exit(arr->os_mutex);
 
171
        } else if (protection == SYNC_ARRAY_MUTEX) {
 
172
                mutex_exit(&(arr->mutex));
 
173
        } else {
 
174
                ut_error;
 
175
        }
 
176
}
 
177
 
 
178
/***********************************************************************
 
179
Creates a synchronization wait array. It is protected by a mutex
 
180
which is automatically reserved when the functions operating on it
 
181
are called. */
 
182
 
 
183
sync_array_t*
 
184
sync_array_create(
 
185
/*==============*/
 
186
                                /* out, own: created wait array */
 
187
        ulint   n_cells,        /* in: number of cells in the array
 
188
                                to create */
 
189
        ulint   protection)     /* in: either SYNC_ARRAY_OS_MUTEX or
 
190
                                SYNC_ARRAY_MUTEX: determines the type
 
191
                                of mutex protecting the data structure */
 
192
{
 
193
        sync_array_t*   arr;
 
194
        sync_cell_t*    cell_array;
 
195
        sync_cell_t*    cell;
 
196
        ulint           i;
 
197
 
 
198
        ut_a(n_cells > 0);
 
199
 
 
200
        /* Allocate memory for the data structures */
 
201
        arr = ut_malloc(sizeof(sync_array_t));
 
202
 
 
203
        cell_array = ut_malloc(sizeof(sync_cell_t) * n_cells);
 
204
 
 
205
        arr->n_cells = n_cells;
 
206
        arr->n_reserved = 0;
 
207
        arr->array = cell_array;
 
208
        arr->protection = protection;
 
209
        arr->sg_count = 0;
 
210
        arr->res_count = 0;
 
211
 
 
212
        /* Then create the mutex to protect the wait array complex */
 
213
        if (protection == SYNC_ARRAY_OS_MUTEX) {
 
214
                arr->os_mutex = os_mutex_create(NULL);
 
215
        } else if (protection == SYNC_ARRAY_MUTEX) {
 
216
                mutex_create(&arr->mutex, SYNC_NO_ORDER_CHECK);
 
217
        } else {
 
218
                ut_error;
 
219
        }
 
220
 
 
221
        for (i = 0; i < n_cells; i++) {
 
222
                cell = sync_array_get_nth_cell(arr, i);
 
223
                cell->state = SC_FREE;
 
224
                cell->wait_object = NULL;
 
225
 
 
226
                /* Create an operating system event semaphore with no name */
 
227
                cell->event = os_event_create(NULL);
 
228
                cell->event_set = FALSE; /* it is created in reset state */
 
229
        }
 
230
 
 
231
        return(arr);
 
232
}
 
233
 
 
234
/**********************************************************************
 
235
Frees the resources in a wait array. */
 
236
 
 
237
void
 
238
sync_array_free(
 
239
/*============*/
 
240
        sync_array_t*   arr)    /* in, own: sync wait array */
 
241
{
 
242
        ulint           i;
 
243
        sync_cell_t*    cell;
 
244
        ulint           protection;
 
245
 
 
246
        ut_a(arr->n_reserved == 0);
 
247
 
 
248
        sync_array_validate(arr);
 
249
 
 
250
        for (i = 0; i < arr->n_cells; i++) {
 
251
                cell = sync_array_get_nth_cell(arr, i);
 
252
                os_event_free(cell->event);
 
253
        }
 
254
 
 
255
        protection = arr->protection;
 
256
 
 
257
        /* Release the mutex protecting the wait array complex */
 
258
 
 
259
        if (protection == SYNC_ARRAY_OS_MUTEX) {
 
260
                os_mutex_free(arr->os_mutex);
 
261
        } else if (protection == SYNC_ARRAY_MUTEX) {
 
262
                mutex_free(&(arr->mutex));
 
263
        } else {
 
264
                ut_error;
 
265
        }
 
266
 
 
267
        ut_free(arr->array);
 
268
        ut_free(arr);
 
269
}
 
270
 
 
271
/************************************************************************
 
272
Validates the integrity of the wait array. Checks
 
273
that the number of reserved cells equals the count variable. */
 
274
 
 
275
void
 
276
sync_array_validate(
 
277
/*================*/
 
278
        sync_array_t*   arr)    /* in: sync wait array */
 
279
{
 
280
        ulint           i;
 
281
        sync_cell_t*    cell;
 
282
        ulint           count           = 0;
 
283
 
 
284
        sync_array_enter(arr);
 
285
 
 
286
        for (i = 0; i < arr->n_cells; i++) {
 
287
                cell = sync_array_get_nth_cell(arr, i);
 
288
 
 
289
                if (cell->state == SC_RESERVED) {
 
290
                        count++;
 
291
                }
 
292
        }
 
293
 
 
294
        ut_a(count == arr->n_reserved);
 
295
 
 
296
        sync_array_exit(arr);
 
297
}
 
298
 
 
299
/**********************************************************************
 
300
Reserves a wait array cell for waiting for an object.
 
301
The event of the cell is reset to nonsignalled state. */
 
302
 
 
303
void
 
304
sync_array_reserve_cell(
 
305
/*====================*/
 
306
        sync_array_t*   arr,    /* in: wait array */
 
307
        void*           object, /* in: pointer to the object to wait for */
 
308
        ulint           type,   /* in: lock request type */
 
309
        const char*     file,   /* in: file where requested */
 
310
        ulint           line,   /* in: line where requested */
 
311
        ulint*          index)  /* out: index of the reserved cell */
 
312
{
 
313
        sync_cell_t*    cell;
 
314
        ulint           i;
 
315
 
 
316
        ut_a(object);
 
317
        ut_a(index);
 
318
 
 
319
        sync_array_enter(arr);
 
320
 
 
321
        arr->res_count++;
 
322
 
 
323
        /* Reserve a new cell. */
 
324
        for (i = 0; i < arr->n_cells; i++) {
 
325
                cell = sync_array_get_nth_cell(arr, i);
 
326
 
 
327
                if (cell->state == SC_FREE) {
 
328
 
 
329
                        /* We do not check cell->event_set because it is
 
330
                        set outside the protection of the sync array mutex
 
331
                        and we had a bug regarding it, and since resetting
 
332
                        an event when it is not needed does no harm it is
 
333
                        safer always to do it. */
 
334
 
 
335
                        cell->event_set = FALSE;
 
336
                        os_event_reset(cell->event);
 
337
 
 
338
                        cell->state = SC_RESERVED;
 
339
                        cell->reservation_time = time(NULL);
 
340
                        cell->thread = os_thread_get_curr_id();
 
341
 
 
342
                        cell->wait_object = object;
 
343
 
 
344
                        if (type == SYNC_MUTEX) {
 
345
                                cell->old_wait_mutex = object;
 
346
                        } else {
 
347
                                cell->old_wait_rw_lock = object;
 
348
                        }
 
349
 
 
350
                        cell->request_type = type;
 
351
                        cell->waiting = FALSE;
 
352
 
 
353
                        cell->file = file;
 
354
                        cell->line = line;
 
355
 
 
356
                        arr->n_reserved++;
 
357
 
 
358
                        *index = i;
 
359
 
 
360
                        sync_array_exit(arr);
 
361
 
 
362
                        return;
 
363
                }
 
364
        }
 
365
 
 
366
        ut_error; /* No free cell found */
 
367
 
 
368
        return;
 
369
}
 
370
 
 
371
/**********************************************************************
 
372
Frees the cell. Note that we don't have any mutex reserved when calling
 
373
this. */
 
374
static
 
375
void
 
376
sync_array_free_cell(
 
377
/*=================*/
 
378
        sync_array_t*   arr,    /* in: wait array */
 
379
        ulint           index)  /* in: index of the cell in array */
 
380
{
 
381
        sync_cell_t*    cell;
 
382
 
 
383
        cell = sync_array_get_nth_cell(arr, index);
 
384
 
 
385
        ut_a(cell->state == SC_WAKING_UP);
 
386
        ut_a(cell->wait_object != NULL);
 
387
 
 
388
        cell->state = SC_FREE;
 
389
}
 
390
 
 
391
/**********************************************************************
 
392
Frees the cell safely by reserving the sync array mutex and decrementing
 
393
n_reserved if necessary. Should only be called from mutex_spin_wait. */
 
394
 
 
395
void
 
396
sync_array_free_cell_protected(
 
397
/*===========================*/
 
398
        sync_array_t*   arr,    /* in: wait array */
 
399
        ulint           index)  /* in: index of the cell in array */
 
400
{
 
401
        sync_cell_t*    cell;
 
402
 
 
403
        sync_array_enter(arr);
 
404
 
 
405
        cell = sync_array_get_nth_cell(arr, index);
 
406
 
 
407
        ut_a(cell->state != SC_FREE);
 
408
        ut_a(cell->wait_object != NULL);
 
409
 
 
410
        /* We only need to decrement n_reserved if it has not already been
 
411
        done by sync_array_signal_object. */
 
412
        if (cell->state == SC_RESERVED) {
 
413
                ut_a(arr->n_reserved > 0);
 
414
                arr->n_reserved--;
 
415
        } else if (cell->state == SC_WAKING_UP) {
 
416
                /* This is tricky; if we don't wait for the event to be
 
417
                signaled, signal_object can set the state of a cell to
 
418
                SC_WAKING_UP, mutex_spin_wait can call this and set the
 
419
                state to SC_FREE, and then signal_object gets around to
 
420
                calling os_set_event for the cell but since it's already
 
421
                been freed things break horribly. */
 
422
 
 
423
                sync_array_exit(arr);
 
424
                os_event_wait(cell->event);
 
425
                sync_array_enter(arr);
 
426
        }
 
427
 
 
428
        cell->state = SC_FREE;
 
429
 
 
430
        sync_array_exit(arr);
 
431
}
 
432
 
 
433
/**********************************************************************
 
434
This function should be called when a thread starts to wait on
 
435
a wait array cell. In the debug version this function checks
 
436
if the wait for a semaphore will result in a deadlock, in which
 
437
case prints info and asserts. */
 
438
 
 
439
void
 
440
sync_array_wait_event(
 
441
/*==================*/
 
442
        sync_array_t*   arr,    /* in: wait array */
 
443
        ulint           index)  /* in: index of the reserved cell */
 
444
{
 
445
        sync_cell_t*    cell;
 
446
        os_event_t      event;
 
447
 
 
448
        ut_a(arr);
 
449
 
 
450
        cell = sync_array_get_nth_cell(arr, index);
 
451
 
 
452
        ut_a((cell->state == SC_RESERVED) || (cell->state == SC_WAKING_UP));
 
453
        ut_a(cell->wait_object);
 
454
        ut_a(!cell->waiting);
 
455
        ut_ad(os_thread_get_curr_id() == cell->thread);
 
456
 
 
457
        event = cell->event;
 
458
        cell->waiting = TRUE;
 
459
 
 
460
#ifdef UNIV_SYNC_DEBUG
 
461
 
 
462
        /* We use simple enter to the mutex below, because if
 
463
        we cannot acquire it at once, mutex_enter would call
 
464
        recursively sync_array routines, leading to trouble.
 
465
        rw_lock_debug_mutex freezes the debug lists. */
 
466
 
 
467
        sync_array_enter(arr);
 
468
        rw_lock_debug_mutex_enter();
 
469
 
 
470
        if (TRUE == sync_array_detect_deadlock(arr, cell, cell, 0)) {
 
471
 
 
472
                fputs("########################################\n", stderr);
 
473
                ut_error;
 
474
        }
 
475
 
 
476
        rw_lock_debug_mutex_exit();
 
477
        sync_array_exit(arr);
 
478
#endif
 
479
        os_event_wait(event);
 
480
 
 
481
        sync_array_free_cell(arr, index);
 
482
}
 
483
 
 
484
/**********************************************************************
 
485
Reports info of a wait array cell. Note: sync_array_print_long_waits()
 
486
calls this without mutex protection. */
 
487
static
 
488
void
 
489
sync_array_cell_print(
 
490
/*==================*/
 
491
        FILE*           file,   /* in: file where to print */
 
492
        sync_cell_t*    cell)   /* in: sync cell */
 
493
{
 
494
        mutex_t*        mutex;
 
495
        rw_lock_t*      rwlock;
 
496
        ulint           type;
 
497
 
 
498
        type = cell->request_type;
 
499
 
 
500
        fprintf(file,
 
501
                "--Thread %lu has waited at %s line %lu"
 
502
                " for %.2f seconds the semaphore:\n",
 
503
                (ulong) os_thread_pf(cell->thread), cell->file,
 
504
                (ulong) cell->line,
 
505
                difftime(time(NULL), cell->reservation_time));
 
506
        fprintf(file, "Wait array cell state %lu\n", (ulong)cell->state);
 
507
 
 
508
        /* If the memory area pointed to by old_wait_mutex /
 
509
        old_wait_rw_lock has been freed, this can crash. */
 
510
 
 
511
        if (cell->state != SC_RESERVED) {
 
512
                /* If cell has this state, then even if we are holding the sync
 
513
                array mutex, the wait object may get freed meanwhile. Do not
 
514
                print the wait object then. */
 
515
 
 
516
        } else if (type == SYNC_MUTEX) {
 
517
                /* We use old_wait_mutex in case the cell has already
 
518
                been freed meanwhile */
 
519
                mutex = cell->old_wait_mutex;
 
520
 
 
521
                fprintf(file,
 
522
                        "Mutex at %p created file %s line %lu, lock var %lu\n"
 
523
#ifdef UNIV_SYNC_DEBUG
 
524
                        "Last time reserved in file %s line %lu, "
 
525
#endif /* UNIV_SYNC_DEBUG */
 
526
                        "waiters flag %lu\n",
 
527
                        (void*) mutex, mutex->cfile_name, (ulong) mutex->cline,
 
528
                        (ulong) mutex->lock_word,
 
529
#ifdef UNIV_SYNC_DEBUG
 
530
                        mutex->file_name, (ulong) mutex->line,
 
531
#endif /* UNIV_SYNC_DEBUG */
 
532
                        (ulong) mutex->waiters);
 
533
 
 
534
        } else if (type == RW_LOCK_EX || type == RW_LOCK_SHARED) {
 
535
 
 
536
                fputs(type == RW_LOCK_EX ? "X-lock on" : "S-lock on", file);
 
537
 
 
538
                rwlock = cell->old_wait_rw_lock;
 
539
 
 
540
                fprintf(file,
 
541
                        " RW-latch at %p created in file %s line %lu\n",
 
542
                        (void*) rwlock, rwlock->cfile_name,
 
543
                        (ulong) rwlock->cline);
 
544
                if (rwlock->writer != RW_LOCK_NOT_LOCKED) {
 
545
                        fprintf(file,
 
546
                                "a writer (thread id %lu) has"
 
547
                                " reserved it in mode %s",
 
548
                                (ulong) os_thread_pf(rwlock->writer_thread),
 
549
                                rwlock->writer == RW_LOCK_EX
 
550
                                ? " exclusive\n"
 
551
                                : " wait exclusive\n");
 
552
                }
 
553
 
 
554
                fprintf(file,
 
555
                        "number of readers %lu, waiters flag %lu\n"
 
556
                        "Last time read locked in file %s line %lu\n"
 
557
                        "Last time write locked in file %s line %lu\n",
 
558
                        (ulong) rwlock->reader_count,
 
559
                        (ulong) rwlock->waiters,
 
560
                        rwlock->last_s_file_name,
 
561
                        (ulong) rwlock->last_s_line,
 
562
                        rwlock->last_x_file_name,
 
563
                        (ulong) rwlock->last_x_line);
 
564
        } else {
 
565
                ut_error;
 
566
        }
 
567
 
 
568
        if (cell->event_set) {
 
569
                fputs("wait is ending\n", file);
 
570
        }
 
571
}
 
572
 
 
573
#ifdef UNIV_SYNC_DEBUG
 
574
/**********************************************************************
 
575
Looks for a cell with the given thread id. */
 
576
static
 
577
sync_cell_t*
 
578
sync_array_find_thread(
 
579
/*===================*/
 
580
                                /* out: pointer to cell or NULL
 
581
                                if not found */
 
582
        sync_array_t*   arr,    /* in: wait array */
 
583
        os_thread_id_t  thread) /* in: thread id */
 
584
{
 
585
        ulint           i;
 
586
        sync_cell_t*    cell;
 
587
 
 
588
        for (i = 0; i < arr->n_cells; i++) {
 
589
 
 
590
                cell = sync_array_get_nth_cell(arr, i);
 
591
 
 
592
                if ((cell->state == SC_RESERVED)
 
593
                    && os_thread_eq(cell->thread, thread)) {
 
594
 
 
595
                        return(cell);   /* Found */
 
596
                }
 
597
        }
 
598
 
 
599
        return(NULL);   /* Not found */
 
600
}
 
601
 
 
602
/**********************************************************************
 
603
Recursion step for deadlock detection. */
 
604
static
 
605
ibool
 
606
sync_array_deadlock_step(
 
607
/*=====================*/
 
608
                                /* out: TRUE if deadlock detected */
 
609
        sync_array_t*   arr,    /* in: wait array; NOTE! the caller must
 
610
                                own the mutex to array */
 
611
        sync_cell_t*    start,  /* in: cell where recursive search
 
612
                                started */
 
613
        os_thread_id_t  thread, /* in: thread to look at */
 
614
        ulint           pass,   /* in: pass value */
 
615
        ulint           depth)  /* in: recursion depth */
 
616
{
 
617
        sync_cell_t*    new;
 
618
        ibool           ret;
 
619
 
 
620
        depth++;
 
621
 
 
622
        if (pass != 0) {
 
623
                /* If pass != 0, then we do not know which threads are
 
624
                responsible of releasing the lock, and no deadlock can
 
625
                be detected. */
 
626
 
 
627
                return(FALSE);
 
628
        }
 
629
 
 
630
        new = sync_array_find_thread(arr, thread);
 
631
 
 
632
        if (new == start) {
 
633
                /* Stop running of other threads */
 
634
 
 
635
                ut_dbg_stop_threads = TRUE;
 
636
 
 
637
                /* Deadlock */
 
638
                fputs("########################################\n"
 
639
                      "DEADLOCK of threads detected!\n", stderr);
 
640
 
 
641
                return(TRUE);
 
642
 
 
643
        } else if (new) {
 
644
                ret = sync_array_detect_deadlock(arr, start, new, depth);
 
645
 
 
646
                if (ret) {
 
647
                        return(TRUE);
 
648
                }
 
649
        }
 
650
        return(FALSE);
 
651
}
 
652
 
 
653
/**********************************************************************
 
654
This function is called only in the debug version. Detects a deadlock
 
655
of one or more threads because of waits of semaphores. */
 
656
static
 
657
ibool
 
658
sync_array_detect_deadlock(
 
659
/*=======================*/
 
660
                                /* out: TRUE if deadlock detected */
 
661
        sync_array_t*   arr,    /* in: wait array; NOTE! the caller must
 
662
                                own the mutex to array */
 
663
        sync_cell_t*    start,  /* in: cell where recursive search started */
 
664
        sync_cell_t*    cell,   /* in: cell to search */
 
665
        ulint           depth)  /* in: recursion depth */
 
666
{
 
667
        mutex_t*        mutex;
 
668
        rw_lock_t*      lock;
 
669
        os_thread_id_t  thread;
 
670
        ibool           ret;
 
671
        rw_lock_debug_t*debug;
 
672
 
 
673
        ut_a(arr);
 
674
        ut_a(start);
 
675
        ut_a(cell);
 
676
        ut_ad(cell->wait_object);
 
677
        ut_ad(os_thread_get_curr_id() == start->thread);
 
678
        ut_ad(depth < 100);
 
679
 
 
680
        depth++;
 
681
 
 
682
        if (cell->event_set || !cell->waiting) {
 
683
 
 
684
                return(FALSE); /* No deadlock here */
 
685
        }
 
686
 
 
687
        if (cell->request_type == SYNC_MUTEX) {
 
688
 
 
689
                mutex = cell->wait_object;
 
690
 
 
691
                if (mutex_get_lock_word(mutex) != 0) {
 
692
 
 
693
                        thread = mutex->thread_id;
 
694
 
 
695
                        /* Note that mutex->thread_id above may be
 
696
                        also OS_THREAD_ID_UNDEFINED, because the
 
697
                        thread which held the mutex maybe has not
 
698
                        yet updated the value, or it has already
 
699
                        released the mutex: in this case no deadlock
 
700
                        can occur, as the wait array cannot contain
 
701
                        a thread with ID_UNDEFINED value. */
 
702
 
 
703
                        ret = sync_array_deadlock_step(arr, start, thread, 0,
 
704
                                                       depth);
 
705
                        if (ret) {
 
706
                                fprintf(stderr,
 
707
                                        "Mutex %p owned by thread %lu"
 
708
                                        " file %s line %lu\n",
 
709
                                        (void*) mutex,
 
710
                                        (ulong) os_thread_pf(mutex->thread_id),
 
711
                                        mutex->file_name, (ulong) mutex->line);
 
712
                                sync_array_cell_print(stderr, cell);
 
713
 
 
714
                                return(TRUE);
 
715
                        }
 
716
                }
 
717
 
 
718
                return(FALSE); /* No deadlock */
 
719
 
 
720
        } else if (cell->request_type == RW_LOCK_EX) {
 
721
 
 
722
                lock = cell->wait_object;
 
723
 
 
724
                debug = UT_LIST_GET_FIRST(lock->debug_list);
 
725
 
 
726
                while (debug != NULL) {
 
727
 
 
728
                        thread = debug->thread_id;
 
729
 
 
730
                        if (((debug->lock_type == RW_LOCK_EX)
 
731
                             && !os_thread_eq(thread, cell->thread))
 
732
                            || ((debug->lock_type == RW_LOCK_WAIT_EX)
 
733
                                && !os_thread_eq(thread, cell->thread))
 
734
                            || (debug->lock_type == RW_LOCK_SHARED)) {
 
735
 
 
736
                                /* The (wait) x-lock request can block
 
737
                                infinitely only if someone (can be also cell
 
738
                                thread) is holding s-lock, or someone
 
739
                                (cannot be cell thread) (wait) x-lock, and
 
740
                                he is blocked by start thread */
 
741
 
 
742
                                ret = sync_array_deadlock_step(
 
743
                                        arr, start, thread, debug->pass,
 
744
                                        depth);
 
745
                                if (ret) {
 
746
print:
 
747
                                        fprintf(stderr, "rw-lock %p ",
 
748
                                                (void*) lock);
 
749
                                        sync_array_cell_print(stderr, cell);
 
750
                                        rw_lock_debug_print(debug);
 
751
                                        return(TRUE);
 
752
                                }
 
753
                        }
 
754
 
 
755
                        debug = UT_LIST_GET_NEXT(list, debug);
 
756
                }
 
757
 
 
758
                return(FALSE);
 
759
 
 
760
        } else if (cell->request_type == RW_LOCK_SHARED) {
 
761
 
 
762
                lock = cell->wait_object;
 
763
                debug = UT_LIST_GET_FIRST(lock->debug_list);
 
764
 
 
765
                while (debug != NULL) {
 
766
 
 
767
                        thread = debug->thread_id;
 
768
 
 
769
                        if ((debug->lock_type == RW_LOCK_EX)
 
770
                            || (debug->lock_type == RW_LOCK_WAIT_EX)) {
 
771
 
 
772
                                /* The s-lock request can block infinitely
 
773
                                only if someone (can also be cell thread) is
 
774
                                holding (wait) x-lock, and he is blocked by
 
775
                                start thread */
 
776
 
 
777
                                ret = sync_array_deadlock_step(
 
778
                                        arr, start, thread, debug->pass,
 
779
                                        depth);
 
780
                                if (ret) {
 
781
                                        goto print;
 
782
                                }
 
783
                        }
 
784
 
 
785
                        debug = UT_LIST_GET_NEXT(list, debug);
 
786
                }
 
787
 
 
788
                return(FALSE);
 
789
 
 
790
        } else {
 
791
                ut_error;
 
792
        }
 
793
 
 
794
        return(TRUE);   /* Execution never reaches this line: for compiler
 
795
                        fooling only */
 
796
}
 
797
#endif /* UNIV_SYNC_DEBUG */
 
798
 
 
799
/**********************************************************************
 
800
Determines if we can wake up the thread waiting for a sempahore. */
 
801
static
 
802
ibool
 
803
sync_arr_cell_can_wake_up(
 
804
/*======================*/
 
805
        sync_cell_t*    cell)   /* in: cell to search */
 
806
{
 
807
        mutex_t*        mutex;
 
808
        rw_lock_t*      lock;
 
809
 
 
810
        if (cell->request_type == SYNC_MUTEX) {
 
811
 
 
812
                mutex = cell->wait_object;
 
813
 
 
814
                if (mutex_get_lock_word(mutex) == 0) {
 
815
 
 
816
                        return(TRUE);
 
817
                }
 
818
 
 
819
        } else if (cell->request_type == RW_LOCK_EX) {
 
820
 
 
821
                lock = cell->wait_object;
 
822
 
 
823
                if (rw_lock_get_reader_count(lock) == 0
 
824
                    && rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
 
825
 
 
826
                        return(TRUE);
 
827
                }
 
828
 
 
829
                if (rw_lock_get_reader_count(lock) == 0
 
830
                    && rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX
 
831
                    && os_thread_eq(lock->writer_thread, cell->thread)) {
 
832
 
 
833
                        return(TRUE);
 
834
                }
 
835
 
 
836
        } else if (cell->request_type == RW_LOCK_SHARED) {
 
837
                lock = cell->wait_object;
 
838
 
 
839
                if (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
 
840
 
 
841
                        return(TRUE);
 
842
                }
 
843
        }
 
844
 
 
845
        return(FALSE);
 
846
}
 
847
 
 
848
/**************************************************************************
 
849
Looks for the cells in the wait array which refer to the wait object
 
850
specified, and sets their corresponding events to the signaled state. In this
 
851
way releases the threads waiting for the object to contend for the object.
 
852
It is possible that no such cell is found, in which case does nothing. */
 
853
 
 
854
void
 
855
sync_array_signal_object(
 
856
/*=====================*/
 
857
        sync_array_t*   arr,    /* in: wait array */
 
858
        void*           object) /* in: wait object */
 
859
{
 
860
        sync_cell_t*    cell;
 
861
        ulint           count;
 
862
        ulint           i;
 
863
        ulint           res_count;
 
864
 
 
865
        /* We store the addresses of cells we need to signal and signal
 
866
        them only after we have released the sync array's mutex (for
 
867
        performance reasons). cell_count is the number of such cells, and
 
868
        cell_ptr points to the first one. If there are less than
 
869
        UT_ARR_SIZE(cells) of them, cell_ptr == &cells[0], otherwise
 
870
        cell_ptr points to malloc'd memory that we must free. */
 
871
 
 
872
        sync_cell_t*    cells[100];
 
873
        sync_cell_t**   cell_ptr = &cells[0];
 
874
        ulint           cell_count = 0;
 
875
        ulint           cell_max_count = UT_ARR_SIZE(cells);
 
876
 
 
877
        ut_a(100 == cell_max_count);
 
878
 
 
879
        sync_array_enter(arr);
 
880
 
 
881
        arr->sg_count++;
 
882
 
 
883
        i = 0;
 
884
        count = 0;
 
885
 
 
886
        /* We need to store this to a local variable because it is modified
 
887
        inside the loop */
 
888
        res_count = arr->n_reserved;
 
889
 
 
890
        while (count < res_count) {
 
891
 
 
892
                cell = sync_array_get_nth_cell(arr, i);
 
893
 
 
894
                if (cell->state == SC_RESERVED) {
 
895
 
 
896
                        count++;
 
897
                        if (cell->wait_object == object) {
 
898
                                cell->state = SC_WAKING_UP;
 
899
 
 
900
                                ut_a(arr->n_reserved > 0);
 
901
                                arr->n_reserved--;
 
902
 
 
903
                                if (cell_count == cell_max_count) {
 
904
                                        sync_cell_t** old_cell_ptr = cell_ptr;
 
905
                                        size_t old_size, new_size;
 
906
 
 
907
                                        old_size = cell_max_count
 
908
                                                * sizeof(sync_cell_t*);
 
909
                                        cell_max_count *= 2;
 
910
                                        new_size = cell_max_count
 
911
                                                * sizeof(sync_cell_t*);
 
912
 
 
913
                                        cell_ptr = malloc(new_size);
 
914
                                        ut_a(cell_ptr);
 
915
                                        memcpy(cell_ptr, old_cell_ptr,
 
916
                                               old_size);
 
917
 
 
918
                                        if (old_cell_ptr != &cells[0]) {
 
919
                                                free(old_cell_ptr);
 
920
                                        }
 
921
                                }
 
922
 
 
923
                                cell_ptr[cell_count] = cell;
 
924
                                cell_count++;
 
925
                        }
 
926
                }
 
927
 
 
928
                i++;
 
929
        }
 
930
 
 
931
        sync_array_exit(arr);
 
932
 
 
933
        for (i = 0; i < cell_count; i++) {
 
934
                cell = cell_ptr[i];
 
935
 
 
936
                cell->event_set = TRUE;
 
937
                os_event_set(cell->event);
 
938
        }
 
939
 
 
940
        if (cell_ptr != &cells[0]) {
 
941
                free(cell_ptr);
 
942
        }
 
943
}
 
944
 
 
945
/**************************************************************************
 
946
If the wakeup algorithm does not work perfectly at semaphore relases,
 
947
this function will do the waking (see the comment in mutex_exit). This
 
948
function should be called about every 1 second in the server.
 
949
 
 
950
Note that there's a race condition between this thread and mutex_exit
 
951
changing the lock_word and calling signal_object, so sometimes this finds
 
952
threads to wake up even when nothing has gone wrong. */
 
953
 
 
954
void
 
955
sync_arr_wake_threads_if_sema_free(void)
 
956
/*====================================*/
 
957
{
 
958
        sync_array_t*   arr     = sync_primary_wait_array;
 
959
        sync_cell_t*    cell;
 
960
        ulint           count;
 
961
        ulint           i;
 
962
        ulint           res_count;
 
963
 
 
964
        sync_array_enter(arr);
 
965
 
 
966
        i = 0;
 
967
        count = 0;
 
968
 
 
969
        /* We need to store this to a local variable because it is modified
 
970
        inside the loop */
 
971
 
 
972
        res_count = arr->n_reserved;
 
973
 
 
974
        while (count < res_count) {
 
975
 
 
976
                cell = sync_array_get_nth_cell(arr, i);
 
977
 
 
978
                if (cell->state == SC_RESERVED) {
 
979
 
 
980
                        count++;
 
981
 
 
982
                        if (sync_arr_cell_can_wake_up(cell)) {
 
983
                                cell->state = SC_WAKING_UP;
 
984
                                cell->event_set = TRUE;
 
985
                                os_event_set(cell->event);
 
986
 
 
987
                                ut_a(arr->n_reserved > 0);
 
988
                                arr->n_reserved--;
 
989
                        }
 
990
                }
 
991
 
 
992
                i++;
 
993
        }
 
994
 
 
995
        sync_array_exit(arr);
 
996
}
 
997
 
 
998
/**************************************************************************
 
999
Prints warnings of long semaphore waits to stderr. */
 
1000
 
 
1001
ibool
 
1002
sync_array_print_long_waits(void)
 
1003
/*=============================*/
 
1004
                        /* out: TRUE if fatal semaphore wait threshold
 
1005
                        was exceeded */
 
1006
{
 
1007
        sync_cell_t*    cell;
 
1008
        ibool           old_val;
 
1009
        ibool           noticed = FALSE;
 
1010
        ulint           i;
 
1011
        ulint           fatal_timeout = srv_fatal_semaphore_wait_threshold;
 
1012
        ibool           fatal = FALSE;
 
1013
 
 
1014
        for (i = 0; i < sync_primary_wait_array->n_cells; i++) {
 
1015
 
 
1016
                cell = sync_array_get_nth_cell(sync_primary_wait_array, i);
 
1017
 
 
1018
                if ((cell->state != SC_FREE)
 
1019
                    && difftime(time(NULL), cell->reservation_time) > 240) {
 
1020
                        fputs("InnoDB: Warning: a long semaphore wait:\n",
 
1021
                              stderr);
 
1022
                        sync_array_cell_print(stderr, cell);
 
1023
                        noticed = TRUE;
 
1024
                }
 
1025
 
 
1026
                if ((cell->state != SC_FREE)
 
1027
                    && difftime(time(NULL), cell->reservation_time)
 
1028
                    > fatal_timeout) {
 
1029
                        fatal = TRUE;
 
1030
                }
 
1031
        }
 
1032
 
 
1033
        if (noticed) {
 
1034
                fprintf(stderr,
 
1035
                        "InnoDB: ###### Starts InnoDB Monitor"
 
1036
                        " for 30 secs to print diagnostic info:\n");
 
1037
                old_val = srv_print_innodb_monitor;
 
1038
 
 
1039
                /* If some crucial semaphore is reserved, then also the InnoDB
 
1040
                Monitor can hang, and we do not get diagnostics. Since in
 
1041
                many cases an InnoDB hang is caused by a pwrite() or a pread()
 
1042
                call hanging inside the operating system, let us print right
 
1043
                now the values of pending calls of these. */
 
1044
 
 
1045
                fprintf(stderr,
 
1046
                        "InnoDB: Pending preads %lu, pwrites %lu\n",
 
1047
                        (ulong)os_file_n_pending_preads,
 
1048
                        (ulong)os_file_n_pending_pwrites);
 
1049
 
 
1050
                srv_print_innodb_monitor = TRUE;
 
1051
                os_event_set(srv_lock_timeout_thread_event);
 
1052
 
 
1053
                os_thread_sleep(30000000);
 
1054
 
 
1055
                srv_print_innodb_monitor = old_val;
 
1056
                fprintf(stderr,
 
1057
                        "InnoDB: ###### Diagnostic info printed"
 
1058
                        " to the standard error stream\n");
 
1059
        }
 
1060
 
 
1061
        return(fatal);
 
1062
}
 
1063
 
 
1064
/**************************************************************************
 
1065
Prints info of the wait array. */
 
1066
static
 
1067
void
 
1068
sync_array_output_info(
 
1069
/*===================*/
 
1070
        FILE*           file,   /* in: file where to print */
 
1071
        sync_array_t*   arr)    /* in: wait array; NOTE! caller must own the
 
1072
                                mutex */
 
1073
{
 
1074
        sync_cell_t*    cell;
 
1075
        ulint           i;
 
1076
 
 
1077
        fprintf(file,
 
1078
                "OS WAIT ARRAY INFO: reservation count %ld,"
 
1079
                " signal count %ld\n",
 
1080
                (long) arr->res_count,
 
1081
                (long) arr->sg_count);
 
1082
        for (i = 0; i < arr->n_cells; i++) {
 
1083
 
 
1084
                cell = sync_array_get_nth_cell(arr, i);
 
1085
 
 
1086
                if (cell->state != SC_FREE) {
 
1087
                        sync_array_cell_print(file, cell);
 
1088
                }
 
1089
        }
 
1090
}
 
1091
 
 
1092
/**************************************************************************
 
1093
Prints info of the wait array. */
 
1094
 
 
1095
void
 
1096
sync_array_print_info(
 
1097
/*==================*/
 
1098
        FILE*           file,   /* in: file where to print */
 
1099
        sync_array_t*   arr)    /* in: wait array */
 
1100
{
 
1101
        sync_array_enter(arr);
 
1102
 
 
1103
        sync_array_output_info(file, arr);
 
1104
 
 
1105
        sync_array_exit(arr);
 
1106
}
 
1107