~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/xaction_xt.cc

  • Committer: Monty Taylor
  • Date: 2009-04-12 08:14:18 UTC
  • mto: (992.1.1 mordred)
  • mto: This revision was merged to the branch mainline in revision 990.
  • Revision ID: mordred@inaugust.com-20090412081418-dc6gh3g3awkrhwov
Merged compress, uncompress and uncompressed_length into one plugin lib. Yay new plugin registration!

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2005 PrimeBase Technologies GmbH
2
 
 *
3
 
 * PrimeBase XT
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * 2005-04-10   Paul McCullagh
20
 
 *
21
 
 * H&G2JCtL
22
 
 */
23
 
 
24
 
#include "xt_config.h"
25
 
 
26
 
#ifdef DRIZZLED
27
 
#include <bitset>
28
 
#endif
29
 
 
30
 
#include <time.h>
31
 
#include <signal.h>
32
 
 
33
 
#include "xaction_xt.h"
34
 
#include "database_xt.h"
35
 
#include "strutil_xt.h"
36
 
#include "heap_xt.h"
37
 
#include "trace_xt.h"
38
 
#include "myxt_xt.h"
39
 
#include "tabcache_xt.h"
40
 
 
41
 
#ifdef DEBUG
42
 
//#define TRACE_WAIT_FOR
43
 
//#define TRACE_VARIATIONS
44
 
//#define TRACE_SWEEPER_ACTIVITY
45
 
 
46
 
/* Enable to trace the statements executed by the engine: */
47
 
//#define TRACE_STATEMENTS
48
 
#endif
49
 
 
50
 
#if defined(TRACE_STATEMENTS) || defined(TRACE_VARIATIONS)
51
 
#define TRACE_TRANSACTION
52
 
#endif
53
 
 
54
 
static void xn_sw_wait_for_xact(XTThreadPtr self, XTDatabaseHPtr db, u_int hsecs);
55
 
static xtBool xn_get_xact_details(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread), int *flags, xtXactID *start, xtXactID *end, xtThreadID *thd_id);
56
 
static xtBool xn_get_xact_pointer(XTDatabaseHPtr db, xtXactID xn_id, XTXactDataPtr *xact_ptr);
57
 
 
58
 
/* ============================================================================================== */
59
 
 
60
 
typedef struct XNSWRecItem {
61
 
        xtTableID                               ri_tab_id;
62
 
        xtRecordID                              ri_rec_id;
63
 
} XNSWRecItemRec, *XNSWRecItemPtr;
64
 
 
65
 
typedef struct XNSWToFreeItem {
66
 
        xtTableID                               ri_tab_id;                      /* If non-zero, then this is the table of the data record to be freed.
67
 
                                                                                                 * If zero, then this free the transaction below must be freed.
68
 
                                                                                                 */
69
 
        union {
70
 
                xtRecordID                      ri_rec_id;
71
 
                xtXactID                        ri_xn_id;
72
 
        } x;
73
 
        xtXactID                                ri_wait_xn_id;          /* Wait for this transaction to be cleaned (or being cleaned up)
74
 
                                                                                                 * before freeing this resource. */
75
 
} XNSWToFreeItemRec, *XNSWToFreeItemPtr;
76
 
 
77
 
/* ----------------------------------------------------------------------
78
 
 * WAIT FOR TRANSACTIONS
79
 
 */
80
 
 
81
 
typedef struct XNWaitFor {
82
 
        xtXactID                                wf_waiting_xn_id;               /* The transaction of the waiting thread. */
83
 
        xtXactID                                wf_for_me_xn_id;                /* The transaction we are waiting for. */
84
 
} XNWaitForRec, *XNWaitForPtr;
85
 
 
86
 
static int xn_compare_wait_for(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
87
 
{
88
 
        xtXactID                *x = (xtXactID *) a;
89
 
        XNWaitForPtr    y = (XNWaitForPtr) b;
90
 
 
91
 
        if (*x == y->wf_waiting_xn_id)
92
 
                return 0;
93
 
        if (xt_xn_is_before(*x, y->wf_waiting_xn_id))
94
 
                return -1;
95
 
        return 1;
96
 
}
97
 
 
98
 
static void xn_free_wait_for(XTThreadPtr XT_UNUSED(self), void *XT_UNUSED(thunk), void *XT_UNUSED(item))
99
 
{
100
 
}
101
 
 
102
 
/*
103
 
 * A deadlock occurs when a transaction is waiting for itself!
104
 
 * For example A is waiting for B which is waiting for A.
105
 
 * By repeatedly scanning the wait_for list we can find out if a
106
 
 * transaction is waiting for itself.
107
 
 */
108
 
static xtBool xn_detect_deadlock(XTDatabaseHPtr db, xtXactID waiting, xtXactID for_me)
109
 
{
110
 
        XNWaitForPtr wf;
111
 
 
112
 
        for (;;) {
113
 
                if (waiting == for_me) {
114
 
#ifdef TRACE_WAIT_FOR
115
 
                        for (u_int i=0; i<xt_sl_get_size(db->db_xn_wait_for); i++) {
116
 
                                wf = (XNWaitForPtr) xt_sl_item_at(db->db_xn_wait_for, i);
117
 
                                xt_trace("T%lu --> T%lu\n", (u_long) wf->wf_waiting_xn_id, (u_long) wf->wf_for_me_xn_id);
118
 
                        }
119
 
                        xt_ttracef(xt_get_self(), "DEADLOCK\n");
120
 
                        xt_dump_trace();
121
 
#endif
122
 
                        xt_register_xterr(XT_REG_CONTEXT, XT_ERR_DEADLOCK);
123
 
                        return TRUE;
124
 
                }
125
 
                if (!(wf = (XNWaitForPtr) xt_sl_find(NULL, db->db_xn_wait_for, &for_me)))
126
 
                        break;
127
 
                for_me = wf->wf_for_me_xn_id;
128
 
        }
129
 
        return FALSE;
130
 
}
131
 
 
132
 
#ifdef XT_USE_SPINLOCK_WAIT_FOR
133
 
 
134
 
#if defined(XT_MAC) || defined(XT_WIN)
135
 
#define WAIT_SPIN_COUNT                 10
136
 
#else
137
 
#define WAIT_SPIN_COUNT                 50
138
 
#endif
139
 
 
140
 
/* Should not be required, but we wait for a second,
141
 
 * just in case the wakeup is missed!
142
 
 */
143
 
#ifdef DEBUG
144
 
#define WAIT_FOR_XACT_TIME              30000
145
 
#else
146
 
#define WAIT_FOR_XACT_TIME              1000
147
 
#endif
148
 
 
149
 
static xtBool xn_add_to_wait_for(XTDatabaseHPtr db, XNWaitForPtr wf, XTThreadPtr thread)
150
 
{
151
 
        /* If we are waiting for a transaction to end, 
152
 
         * put this thread on the wait list...
153
 
         *
154
 
         * As long as the temporary lock is removed
155
 
         * or turned into a permanent lock before
156
 
         * a thread waits again, all should be OK!
157
 
         */
158
 
        xt_spinlock_lock(&db->db_xn_wait_spinlock);
159
 
 
160
 
#ifdef TRACE_WAIT_FOR
161
 
        xt_ttracef(thread, "T%lu -wait-> T%lu\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) wait_xn_id);
162
 
#endif
163
 
        /* Check for a deadlock: */
164
 
        if (xn_detect_deadlock(db, wf->wf_waiting_xn_id, wf->wf_for_me_xn_id))
165
 
                goto failed;
166
 
 
167
 
        /* We will wait for this transaction... */
168
 
        db->db_xn_wait_count++;
169
 
        if (thread->st_xact_writer)
170
 
                db->db_xn_writer_wait_count++;
171
 
 
172
 
        if (!xt_sl_insert(NULL, db->db_xn_wait_for, &wf->wf_waiting_xn_id, wf)) {
173
 
                db->db_xn_wait_count--;
174
 
                goto failed;
175
 
        }
176
 
 
177
 
        xt_spinlock_unlock(&db->db_xn_wait_spinlock);
178
 
        return OK;
179
 
 
180
 
        failed:
181
 
        xt_spinlock_unlock(&db->db_xn_wait_spinlock);
182
 
        return FAILED;
183
 
}
184
 
 
185
 
inline void xn_remove_from_wait_for(XTDatabaseHPtr db, XNWaitForPtr wf, XTThreadPtr thread)
186
 
{
187
 
        xt_spinlock_lock(&db->db_xn_wait_spinlock);
188
 
 
189
 
        xt_sl_delete(NULL, db->db_xn_wait_for, &wf->wf_waiting_xn_id);
190
 
        db->db_xn_wait_count--;
191
 
        if (thread->st_xact_writer)
192
 
                db->db_xn_writer_wait_count--;
193
 
 
194
 
#ifdef TRACE_WAIT_FOR
195
 
        xt_ttracef(thread, "T%lu -wait-> T%lu FAILED\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) wait_xn_id);
196
 
#endif
197
 
        xt_spinlock_unlock(&db->db_xn_wait_spinlock);
198
 
}
199
 
 
200
 
/* Wait for a transation to terminate or a lock to be granted.
201
 
 *
202
 
 * If term_req is TRUE, then the termination of the transaction is required
203
 
 * before continuing.
204
 
 *
205
 
 * If pw_func is set then this function will not return before this call has
206
 
 * succeeded.
207
 
 *
208
 
 * This function returns FAILE on error.
209
 
 */
210
 
xtPublic xtBool xt_xn_wait_for_xact(XTThreadPtr thread, XTXactWaitPtr xw, XTLockWaitPtr lw)
211
 
{
212
 
        XTDatabaseHPtr          db = thread->st_database;
213
 
        XNWaitForRec            wf;
214
 
        int                                     flags = 0;
215
 
        xtXactID                        start = 0;
216
 
        XTXactDataPtr           wait_xact_ptr;
217
 
        xtBool                          on_wait_list = FALSE;
218
 
        XTXactWaitRec           xw_new;
219
 
        u_int                           loop_count = 0;
220
 
        XTWaitThreadPtr         my_wt;
221
 
 
222
 
        ASSERT_NS(thread->st_xact_data);
223
 
        thread->st_statistics.st_wait_for_xact++;
224
 
 
225
 
        wf.wf_waiting_xn_id = thread->st_xact_data->xd_start_xn_id;
226
 
 
227
 
        if (lw) {
228
 
                /* If we are here, then the lw structure is on the wait
229
 
                 * queue for the given lock.
230
 
                 */
231
 
                xtXactID locking_xn_id;
232
 
                
233
 
                wait_for_locker:
234
 
                locking_xn_id = lw->lw_xn_id;
235
 
                wf.wf_for_me_xn_id = lw->lw_xn_id;
236
 
                if (!xn_add_to_wait_for(db, &wf, thread)) {
237
 
                        lw->lw_ot->ot_table->tab_locks.xt_cancel_temp_lock(lw);
238
 
                        return FAILED;
239
 
                }
240
 
 
241
 
                while (loop_count < WAIT_SPIN_COUNT) {
242
 
                        loop_count++;
243
 
 
244
 
                        switch (lw->lw_curr_lock) {
245
 
                                case XT_LOCK_ERR:
246
 
                                        xn_remove_from_wait_for(db, &wf, thread);
247
 
                                        return FAILED;
248
 
                                case XT_NO_LOCK:
249
 
                                        /* Got the lock: */
250
 
                                        /* Check if we must also wait for the transaction: */
251
 
                                        if (lw->lw_row_updated) {
252
 
                                                /* This will override the xw passed in.
253
 
                                                 * The reason is, because we are actually waiting
254
 
                                                 * for a lock, and the lock owner may have changed
255
 
                                                 * while we were waiting for the lock.
256
 
                                                 */
257
 
                                                xw_new.xw_xn_id = lw->lw_updating_xn_id;
258
 
                                                xw = &xw_new;
259
 
                                        }
260
 
                                        if (xw) {
261
 
                                                if (wf.wf_for_me_xn_id == xw->xw_xn_id)
262
 
                                                        on_wait_list = TRUE;
263
 
                                                else
264
 
                                                        xn_remove_from_wait_for(db, &wf, thread);
265
 
                                                goto wait_for_xact;
266
 
                                        }
267
 
                                        xn_remove_from_wait_for(db, &wf, thread);
268
 
                                        return OK;
269
 
                                case XT_TEMP_LOCK:
270
 
                                case XT_PERM_LOCK:
271
 
                                        if (locking_xn_id != lw->lw_xn_id) {
272
 
                                                /* Change the transaction that we are waiting for: */
273
 
                                                xn_remove_from_wait_for(db, &wf, thread);
274
 
                                                goto wait_for_locker;
275
 
                                        }
276
 
                                        break;
277
 
                        }
278
 
 
279
 
                        xt_critical_wait();
280
 
                }
281
 
 
282
 
 
283
 
                /* The non-spinning version... */
284
 
                wait_for_locker_no_spin:
285
 
                THR_ARRAY_READ_LOCK(&xt_thr_array_resize_lock, thread->t_id);
286
 
                my_wt = xt_thr_array[thread->t_id].td_waiting;
287
 
                THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, thread->t_id);
288
 
                xt_lock_mutex_ns(&my_wt->wt_lock);
289
 
 
290
 
                for (;;) {
291
 
                        switch (lw->lw_curr_lock) {
292
 
                                case XT_LOCK_ERR:
293
 
                                        xt_unlock_mutex_ns(&my_wt->wt_lock);
294
 
                                        xn_remove_from_wait_for(db, &wf, thread);
295
 
                                        return FAILED;
296
 
                                case XT_NO_LOCK:
297
 
                                        xt_unlock_mutex_ns(&my_wt->wt_lock);
298
 
                                        if (lw->lw_row_updated) {
299
 
                                                xw_new.xw_xn_id = lw->lw_updating_xn_id;
300
 
                                                xw = &xw_new;
301
 
                                        }
302
 
                                        if (xw) {
303
 
                                                if (wf.wf_for_me_xn_id == xw->xw_xn_id)
304
 
                                                        on_wait_list = TRUE;
305
 
                                                else
306
 
                                                        xn_remove_from_wait_for(db, &wf, thread);
307
 
                                                goto wait_for_xact;
308
 
                                        }
309
 
                                        xn_remove_from_wait_for(db, &wf, thread);
310
 
                                        return OK;
311
 
                                case XT_TEMP_LOCK:
312
 
                                case XT_PERM_LOCK:
313
 
                                        if (locking_xn_id != lw->lw_xn_id) {
314
 
                                                /* Change the transaction that we are waiting for: */
315
 
                                                xt_unlock_mutex_ns(&my_wt->wt_lock);
316
 
                                                xn_remove_from_wait_for(db, &wf, thread);
317
 
                                                locking_xn_id = lw->lw_xn_id;
318
 
                                                wf.wf_for_me_xn_id = lw->lw_xn_id;
319
 
                                                if (!xn_add_to_wait_for(db, &wf, thread)) {
320
 
                                                        lw->lw_ot->ot_table->tab_locks.xt_cancel_temp_lock(lw);
321
 
                                                        return FAILED;
322
 
                                                }
323
 
                                                goto wait_for_locker_no_spin;
324
 
                                        }
325
 
                                        break;
326
 
                        }
327
 
 
328
 
                        xt_timed_wait_cond_ns(&my_wt->wt_cond, &my_wt->wt_lock, WAIT_FOR_XACT_TIME);
329
 
                }
330
 
 
331
 
                /* Unreachable
332
 
                xt_unlock_mutex_ns(&my_wt->wt_lock);
333
 
                */
334
 
        }
335
 
 
336
 
        if (xw) {
337
 
                xtThreadID              tn_thd_id;
338
 
 
339
 
                wait_for_xact:
340
 
                wf.wf_for_me_xn_id = xw->xw_xn_id;
341
 
 
342
 
                if (!xn_get_xact_pointer(db, xw->xw_xn_id, &wait_xact_ptr))
343
 
                        /* The transaction was not found... */
344
 
                        goto wait_done;
345
 
 
346
 
                if (wait_xact_ptr) {
347
 
                        /* This is a dirty read, but it should work! */
348
 
                        flags = wait_xact_ptr->xd_flags;
349
 
                        start = wait_xact_ptr->xd_start_xn_id;
350
 
                        tn_thd_id = wait_xact_ptr->xd_thread_id;
351
 
                }
352
 
                else {
353
 
                        tn_thd_id = 0;
354
 
                        if (!xn_get_xact_details(db, xw->xw_xn_id, thread, &flags, &start, NULL, &tn_thd_id))
355
 
                                flags = XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
356
 
                }
357
 
 
358
 
                if ((flags & XT_XN_XAC_ENDED) || start != xw->xw_xn_id)
359
 
                        /* The transaction has terminated! */
360
 
                        goto wait_done;
361
 
 
362
 
                /* Tell the thread we are waiting for it: */
363
 
                xt_add_to_wakeup_list(thread->t_id, tn_thd_id);
364
 
 
365
 
                if (!on_wait_list) {
366
 
                        if (!xn_add_to_wait_for(db, &wf, thread))
367
 
                                return FAILED;
368
 
                        on_wait_list = TRUE;
369
 
                }
370
 
 
371
 
                /* The spinning version: */
372
 
                while (loop_count < WAIT_SPIN_COUNT) {
373
 
                        loop_count++;
374
 
 
375
 
                        xt_critical_wait();
376
 
 
377
 
                        if (wait_xact_ptr) {
378
 
                                /* This is a dirty read, but it should work! */
379
 
                                flags = wait_xact_ptr->xd_flags;
380
 
                                start = wait_xact_ptr->xd_start_xn_id;
381
 
                        }
382
 
                        else {
383
 
                                if (!xn_get_xact_details(db, xw->xw_xn_id, thread, &flags, &start, NULL, NULL))
384
 
                                        flags = XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
385
 
                        }
386
 
 
387
 
                        if ((flags & XT_XN_XAC_ENDED) || start != xw->xw_xn_id)
388
 
                                /* The transaction has terminated! */
389
 
                                goto wait_done;
390
 
                }
391
 
 
392
 
                /* The non-spinning version:
393
 
                 *
394
 
                 * I believe I can avoid missing the wakeup signal
395
 
                 * by locking before we check if the transaction
396
 
                 * is still running.
397
 
                 *
398
 
                 * Even though db->db_xn_wait_on_cond is "dirty read".
399
 
                 *
400
 
                 * The reason is, before the signal is sent the 
401
 
                 * lock is also aquired. This is not possible until
402
 
                 * this thread is safely sleaping.
403
 
                 */
404
 
                THR_ARRAY_READ_LOCK(&xt_thr_array_resize_lock, thread->t_id);
405
 
                my_wt = xt_thr_array[thread->t_id].td_waiting;
406
 
                THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, thread->t_id);
407
 
 
408
 
                xt_lock_mutex_ns(&my_wt->wt_lock);
409
 
 
410
 
                for (;;) {
411
 
                        if (wait_xact_ptr) {
412
 
                                /* This is a dirty read, but it should work! */
413
 
                                flags = wait_xact_ptr->xd_flags;
414
 
                                start = wait_xact_ptr->xd_start_xn_id;
415
 
                        }
416
 
                        else {
417
 
                                if (!xn_get_xact_details(db, xw->xw_xn_id, thread, &flags, &start, NULL, NULL))
418
 
                                        flags = XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
419
 
                        }
420
 
 
421
 
                        if ((flags & XT_XN_XAC_ENDED) || start != xw->xw_xn_id)
422
 
                                /* The transaction has terminated! */
423
 
                                break;
424
 
 
425
 
                        xt_timed_wait_cond_ns(&my_wt->wt_cond, &my_wt->wt_lock, WAIT_FOR_XACT_TIME);
426
 
                }
427
 
 
428
 
                xt_unlock_mutex_ns(&my_wt->wt_lock);
429
 
 
430
 
                wait_done:
431
 
                if (on_wait_list)
432
 
                        xn_remove_from_wait_for(db, &wf, thread);
433
 
        }
434
 
 
435
 
        return OK;
436
 
}
437
 
 
438
 
#else // XT_USE_SPINLOCK_WAIT_FOR
439
 
/*
440
 
 * The given thread must wait for the specified transaction to terminate. This
441
 
 * function places the transaction of the thread on a list of waiting threads.
442
 
 *
443
 
 * Before waiting we make a check for deadlocks. A deadlock occurs
444
 
 * if waiting would introduce a cycle.
445
 
 */
446
 
xtPublic xtBool old_xt_xn_wait_for_xact(XTThreadPtr thread, xtXactID xn_id, xtBool will_retry, XTLockWaitFuncPtr pw_func, XTLockWaitPtr pw_data)
447
 
{
448
 
        XTDatabaseHPtr          db = thread->st_database;
449
 
        XNWaitForRec            wf;
450
 
        int                                     flags = 0;
451
 
        xtXactID                        start = 0;
452
 
 
453
 
        ASSERT_NS(thread->st_xact_data);
454
 
 
455
 
        thread->st_statistics.st_wait_for_xact++;
456
 
        wf.wf_waiting_xn_id = thread->st_xact_data->xd_start_xn_id;
457
 
        wf.wf_for_me_xn_id = xn_id;
458
 
        wf.wf_thread_id = thread->t_id;
459
 
 
460
 
        xt_lock_mutex_ns(&db->db_xn_wait_lock);
461
 
 
462
 
#ifdef TRACE_WAIT_FOR
463
 
        xt_ttracef(thread, "T%lu -wait-> T%lu\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) xn_id);
464
 
#endif
465
 
        for (;;) {
466
 
                if (!xn_get_xact_details(db, xn_id, thread, &flags, &start, NULL, NULL))
467
 
                        break;
468
 
 
469
 
                /* This is a dirty read, but it should work! */
470
 
                if ((flags & XT_XN_XAC_ENDED) || start != xn_id)
471
 
                        break;
472
 
 
473
 
                if (xn_detect_deadlock(db, wf.wf_waiting_xn_id, wf.wf_for_me_xn_id))
474
 
                        goto failed;
475
 
 
476
 
                /* We will wait for this transaction... */
477
 
                db->db_xn_wait_count++;
478
 
                if (thread->st_xact_writer)
479
 
                        db->db_xn_writer_wait_count++;
480
 
 
481
 
                if (!xt_sl_insert(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id, &wf)) {
482
 
                        db->db_xn_wait_count--;
483
 
                        goto failed;
484
 
                }
485
 
 
486
 
                if (!xn_get_xact_details(db, xn_id, thread, &flags, &start, NULL, NULL)) {
487
 
                        xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
488
 
                        db->db_xn_wait_count--;
489
 
                        if (thread->st_xact_writer)
490
 
                                db->db_xn_writer_wait_count--;
491
 
                        break;
492
 
                }
493
 
 
494
 
                if ((flags & XT_XN_XAC_ENDED) || start != xn_id) {
495
 
                        xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
496
 
                        db->db_xn_wait_count--;
497
 
                        if (thread->st_xact_writer)
498
 
                                db->db_xn_writer_wait_count--;
499
 
                        break;
500
 
                }
501
 
 
502
 
                db->db_xn_post_wait[thread->t_id].pw_call_me = pw_func;
503
 
                db->db_xn_post_wait[thread->t_id].pw_thread = thread;
504
 
                db->db_xn_post_wait[thread->t_id].pw_data = pw_data;
505
 
 
506
 
                /* Timed wait because it is possible that transaction quits before
507
 
                 * we go to sleep.
508
 
                 */
509
 
                if (!xt_timed_wait_cond(NULL, &db->db_xn_wait_cond, &db->db_xn_wait_lock, 2 * 1000)) {
510
 
                        xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
511
 
                        db->db_xn_wait_count--;
512
 
                        if (thread->st_xact_writer)
513
 
                                db->db_xn_writer_wait_count--;
514
 
                        goto failed;
515
 
                }
516
 
 
517
 
                db->db_xn_post_wait[thread->t_id].pw_call_me = NULL;
518
 
                xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
519
 
                db->db_xn_wait_count--;
520
 
                if (thread->st_xact_writer)
521
 
                        db->db_xn_writer_wait_count--;
522
 
                
523
 
                if (will_retry)
524
 
                        break;
525
 
        }
526
 
 
527
 
#ifdef TRACE_WAIT_FOR
528
 
        xt_ttracef(thread, "T%lu -wait-> T%lu DONE\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) xn_id);
529
 
#endif
530
 
        xt_unlock_mutex_ns(&db->db_xn_wait_lock);
531
 
        return OK;
532
 
 
533
 
        failed:
534
 
#ifdef TRACE_WAIT_FOR
535
 
        xt_ttracef(self, "T%lu -wait-> T%lu FAILED\n", (u_long) self->st_xact_data->xd_start_xn_id, (u_long) xn_id);
536
 
#endif
537
 
        xt_unlock_mutex_ns(&db->db_xn_wait_lock);
538
 
        return FAILED;
539
 
}
540
 
 
541
 
xtPublic void old_xt_xn_wakeup_transactions(XTDatabaseHPtr db, XTThreadPtr thread)
542
 
{
543
 
        u_int                   len;
544
 
        XNWaitForPtr    wf;
545
 
 
546
 
        xt_lock_mutex_ns(&db->db_xn_wait_lock);
547
 
        /* The idea here is to release the oldest transactions
548
 
         * first. Although this may not be completely fair
549
 
         * it has the advantage that older transactions are
550
 
         * encouraged to complete first.
551
 
         *
552
 
         * I have found the following problem with this test:
553
 
         * runTest(INCREMENT_TEST, 16, INCREMENT_TEST_UPDATE_COUNT);
554
 
         * with a bit of bad luck a transaction can be starved.
555
 
         * This results in the sweeper stalling because it is
556
 
         * waiting for an old transaction to quite so that
557
 
         * it continue.
558
 
         *
559
 
         * Because the sweeper is waiting, the number of
560
 
         * versions of the record to be updated
561
 
         * begins to increase. In the above test over
562
 
         * 1600 transaction remain uncleaned.
563
 
         *
564
 
         * This means that there are 1600 version of the
565
 
         * row which must be scanned to find the most
566
 
         * recent version.
567
 
         */
568
 
        if ((len = (u_int) xt_sl_get_size(db->db_xn_wait_for))) {
569
 
                for (u_int i=0; i<len; i++) {
570
 
                        wf = (XNWaitForPtr) xt_sl_item_at(db->db_xn_wait_for, i);
571
 
                        if (db->db_xn_post_wait[wf->wf_thread_id].pw_call_me) {
572
 
                                if (db->db_xn_post_wait[wf->wf_thread_id].pw_call_me(thread, &db->db_xn_post_wait[wf->wf_thread_id]))
573
 
                                        db->db_xn_post_wait[wf->wf_thread_id].pw_call_me = NULL;
574
 
                        }
575
 
                }
576
 
                if (!xt_broadcast_cond_ns(&db->db_xn_wait_cond))
577
 
                        xt_log_and_clear_exception_ns();
578
 
        }
579
 
        ASSERT_NS(db->db_xn_wait_count == len);
580
 
        xt_unlock_mutex_ns(&db->db_xn_wait_lock);
581
 
}
582
 
#endif  // XT_USE_SPINLOCK_WAIT_FOR
583
 
 
584
 
/* ----------------------------------------------------------------------
585
 
 * Utilities
586
 
 */
587
 
 
588
 
//#define HIGH_X
589
 
#ifdef HIGH_X
590
 
u_long tot_alloced;
591
 
u_long high_alloced;
592
 
u_long not_clean_max;
593
 
u_long in_ram_max;
594
 
#endif
595
 
 
596
 
static void xn_free_xact(XTDatabaseHPtr db, XTXactSegPtr seg, XTXactDataPtr xact)
597
 
{
598
 
#ifdef HIGH_X
599
 
        tot_alloced--;
600
 
#endif
601
 
        /* This indicates the structure is free: */
602
 
        xact->xd_start_xn_id = 0;
603
 
        if ((xtWord1 *) xact >= db->db_xn_data && (xtWord1 *) xact < db->db_xn_data_end) {
604
 
                /* Put it in the free list: */
605
 
                xact->xd_next_xact = seg->xs_free_list;
606
 
                seg->xs_free_list = xact;
607
 
                return;
608
 
        }
609
 
        xt_free_ns(xact);
610
 
}
611
 
 
612
 
/*
613
 
 * GOTCHA: The value db->db_xn_curr_id may be a bit larger
614
 
 * than the actual transaction created because there is
615
 
 * a gap between the issude of the transaction ID
616
 
 * and the creation of a memory structure.
617
 
 * (indicated here: {GAP-INC-ADD-XACT})
618
 
 *
619
 
 * This function returns the actuall current transaction ID.
620
 
 * This is the number of the last transaction actually
621
 
 * created in memory.
622
 
 *
623
 
 * This means that if you call xt_xn_get_xact() with any
624
 
 * number less than or equal to this value, not finding
625
 
 * the transaction means it has already ended!
626
 
 */
627
 
xtPublic xtXactID xt_xn_get_curr_id(XTDatabaseHPtr db)
628
 
{
629
 
        int                                             i;
630
 
        xtXactID                                curr_xn_id;
631
 
        register XTXactSegPtr   seg = db->db_xn_idx;
632
 
 
633
 
        /* Find the highest transaction ID actually created... */
634
 
        curr_xn_id = seg->xs_last_xn_id;
635
 
        seg++;
636
 
        for (i=1; i<XT_XN_NO_OF_SEGMENTS; i++, seg++) {
637
 
                if (xt_xn_is_before(curr_xn_id, seg->xs_last_xn_id))
638
 
                        curr_xn_id = seg->xs_last_xn_id;
639
 
        }
640
 
        return curr_xn_id;
641
 
}
642
 
 
643
 
xtPublic XTXactDataPtr xt_xn_add_old_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr thread)
644
 
{
645
 
        register XTXactDataPtr  xact;
646
 
        register XTXactSegPtr   seg;
647
 
        register XTXactDataPtr  *hash;
648
 
 
649
 
        (void) thread;
650
 
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
651
 
        XT_XACT_WRITE_LOCK(&seg->xs_tab_lock, thread);
652
 
        hash = &seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
653
 
        xact = *hash;
654
 
        while (xact) {
655
 
                if (xact->xd_start_xn_id == xn_id)
656
 
                        goto done_ok;
657
 
                xact = xact->xd_next_xact;
658
 
        }
659
 
 
660
 
        if ((xact = seg->xs_free_list))
661
 
                seg->xs_free_list = xact->xd_next_xact;
662
 
        else {
663
 
                /* We have used up all the free transaction slots,
664
 
                 * the sweeper should work faster to free them
665
 
                 * up...
666
 
                 */
667
 
                db->db_sw_faster |= XT_SW_NO_MORE_XACT_SLOTS;
668
 
                if (!(xact = (XTXactDataPtr) xt_malloc_ns(sizeof(XTXactDataRec)))) {
669
 
                        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
670
 
                        return NULL;
671
 
                }
672
 
        }
673
 
 
674
 
        xact->xd_next_xact = *hash;
675
 
        *hash = xact;
676
 
 
677
 
        xact->xd_start_xn_id = xn_id;
678
 
        xact->xd_end_xn_id = 0;
679
 
        xact->xd_end_time = 0;
680
 
        xact->xd_begin_log = 0;
681
 
        xact->xd_flags = 0;
682
 
 
683
 
        /* Get the largest transaction id. */
684
 
        if (xt_xn_is_before(seg->xs_last_xn_id, xn_id))
685
 
                seg->xs_last_xn_id = xn_id;
686
 
 
687
 
        done_ok:
688
 
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
689
 
#ifdef HIGH_X
690
 
        tot_alloced++;
691
 
        if (tot_alloced > high_alloced)
692
 
                high_alloced = tot_alloced;
693
 
#endif
694
 
        return xact;
695
 
}
696
 
 
697
 
static XTXactDataPtr xn_add_new_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr thread)
698
 
{
699
 
        register XTXactDataPtr  xact;
700
 
        register XTXactSegPtr   seg;
701
 
        register XTXactDataPtr  *hash;
702
 
 
703
 
        (void) thread;
704
 
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
705
 
        XT_XACT_WRITE_LOCK(&seg->xs_tab_lock, thread);
706
 
        hash = &seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
707
 
 
708
 
        if ((xact = seg->xs_free_list))
709
 
                seg->xs_free_list = xact->xd_next_xact;
710
 
        else {
711
 
                /* We have used up all the free transaction slots,
712
 
                 * the sweeper should work faster to free them
713
 
                 * up...
714
 
                 */
715
 
                db->db_sw_faster |= XT_SW_NO_MORE_XACT_SLOTS;
716
 
                if (!(xact = (XTXactDataPtr) xt_malloc_ns(sizeof(XTXactDataRec)))) {
717
 
                        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
718
 
                        return NULL;
719
 
                }
720
 
        }
721
 
 
722
 
        xact->xd_next_xact = *hash;
723
 
        *hash = xact;
724
 
 
725
 
        xact->xd_thread_id = thread->t_id;
726
 
        xact->xd_start_xn_id = xn_id;
727
 
        xact->xd_end_xn_id = 0;
728
 
        xact->xd_end_time = 0;
729
 
        xact->xd_begin_log = 0;
730
 
        xact->xd_flags = 0;
731
 
 
732
 
        seg->xs_last_xn_id = xn_id;
733
 
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
734
 
#ifdef HIGH_X
735
 
        tot_alloced++;
736
 
        if (tot_alloced > high_alloced)
737
 
                high_alloced = tot_alloced;
738
 
#endif
739
 
        return xact;
740
 
}
741
 
 
742
 
static xtBool xn_get_xact_details(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread), int *flags, xtXactID *start, xtWord4 *end, xtThreadID *thd_id)
743
 
{
744
 
        register XTXactSegPtr   seg;
745
 
        register XTXactDataPtr  xact;
746
 
        xtBool                                  found = FALSE;
747
 
 
748
 
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
749
 
        XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
750
 
        xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
751
 
        while (xact) {
752
 
                if (xact->xd_start_xn_id == xn_id) {
753
 
                        found = TRUE;
754
 
                        if (flags)
755
 
                                *flags = xact->xd_flags;
756
 
                        if (start)
757
 
                                *start = xact->xd_start_xn_id;
758
 
                        if (end)
759
 
                                *end = xact->xd_end_time;
760
 
                        if (thd_id)
761
 
                                *thd_id = xact->xd_thread_id;
762
 
                        break;
763
 
                }
764
 
                xact = xact->xd_next_xact;
765
 
        }
766
 
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
767
 
        return found;
768
 
}
769
 
 
770
 
static xtBool xn_get_xact_pointer(XTDatabaseHPtr db, xtXactID xn_id, XTXactDataPtr *xact_ptr)
771
 
{
772
 
        register XTXactSegPtr   seg;
773
 
        register XTXactDataPtr  xact;
774
 
        xtBool                                  found = FALSE;
775
 
 
776
 
        *xact_ptr = NULL;
777
 
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
778
 
        XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
779
 
        xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
780
 
        while (xact) {
781
 
                if (xact->xd_start_xn_id == xn_id) {
782
 
                        found = TRUE;
783
 
                        /* We only return pointers to transaction structures that are permanently
784
 
                         * allocated!
785
 
                         */
786
 
                        if ((xtWord1 *) xact >= db->db_xn_data && (xtWord1 *) xact < db->db_xn_data_end)
787
 
                                *xact_ptr = xact;
788
 
                        break;
789
 
                }
790
 
                xact = xact->xd_next_xact;
791
 
        }
792
 
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
793
 
        return found;
794
 
}
795
 
 
796
 
/*
797
 
 * Note, this function only returns TRUE if the transaction
798
 
 * still needs to be cleaned.
799
 
 */
800
 
static xtBool xn_get_xact_start(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread), xtLogID *log_id, xtLogOffset *log_offset)
801
 
{
802
 
        register XTXactSegPtr   seg;
803
 
        register XTXactDataPtr  xact;
804
 
        xtBool                                  found = FALSE;
805
 
 
806
 
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
807
 
        XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
808
 
        xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
809
 
        while (xact) {
810
 
                if (xact->xd_start_xn_id == xn_id) {
811
 
                        /* Consider only transactions that have not be cleaned! */
812
 
                        if (!(xact->xd_flags & XT_XN_XAC_CLEANED))
813
 
                                found = TRUE;
814
 
                        if (log_id) {
815
 
                                *log_id = xact->xd_begin_log;
816
 
                                *log_offset = xact->xd_begin_offset;
817
 
                        }
818
 
                        break;
819
 
                }
820
 
                xact = xact->xd_next_xact;
821
 
        }
822
 
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
823
 
        return found;
824
 
}
825
 
 
826
 
/* NOTE: this function may only be used by the sweeper or the recovery process. */
827
 
xtPublic XTXactDataPtr xt_xn_get_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread))
828
 
{
829
 
        register XTXactSegPtr   seg;
830
 
        register XTXactDataPtr  xact;
831
 
 
832
 
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
833
 
        XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
834
 
        xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
835
 
        while (xact) {
836
 
                if (xact->xd_start_xn_id == xn_id)
837
 
                        break;
838
 
                xact = xact->xd_next_xact;
839
 
        }
840
 
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
841
 
        return xact;
842
 
}
843
 
 
844
 
/*
845
 
 * Delete a transaction, return TRUE if the transaction
846
 
 * was found.
847
 
 */
848
 
xtPublic xtBool xt_xn_delete_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr thread)
849
 
{
850
 
        XTXactDataPtr   xact, pxact = NULL;
851
 
        XTXactSegPtr    seg;
852
 
 
853
 
        (void) thread;
854
 
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
855
 
        XT_XACT_WRITE_LOCK(&seg->xs_tab_lock, thread);
856
 
        xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
857
 
        while (xact) {
858
 
                if (xact->xd_start_xn_id == xn_id) {
859
 
                        if (pxact)
860
 
                                pxact->xd_next_xact = xact->xd_next_xact;
861
 
                        else
862
 
                                 seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE] = xact->xd_next_xact;
863
 
                        xn_free_xact(db, seg, xact);
864
 
                        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
865
 
                        return TRUE;
866
 
                }
867
 
                pxact = xact;
868
 
                xact = xact->xd_next_xact;
869
 
        }
870
 
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
871
 
        return FALSE;
872
 
}
873
 
 
874
 
//#define DEBUG_RAM_LIST
875
 
#ifdef DEBUG_RAM_LIST
876
 
 
877
 
#define DEBUG_RAM_LIST_SIZE                     80
878
 
 
879
 
int                                     check_ram_init_count = 0;
880
 
xt_rwlock_type          check_ram_lock;
881
 
xtXactID                        check_ram_trns[DEBUG_RAM_LIST_SIZE];
882
 
int                                     check_ram_dummy;
883
 
 
884
 
static void check_ram_init(void)
885
 
{
886
 
        if (check_ram_init_count == 0)
887
 
                xt_init_rwlock(NULL, &check_ram_lock);
888
 
        check_ram_init_count++;
889
 
}
890
 
 
891
 
static void check_ram_free(void)
892
 
{
893
 
        check_ram_init_count--;
894
 
        if (check_ram_init_count == 0)
895
 
                xt_free_rwlock(&check_ram_lock);
896
 
}
897
 
 
898
 
static void check_ram_min_id(XTDatabaseHPtr db)
899
 
{
900
 
        int i;
901
 
 
902
 
        xt_slock_rwlock_ns(&check_ram_lock);
903
 
        for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
904
 
                if (check_ram_trns[i] && xt_xn_is_before(check_ram_trns[i], db->db_xn_min_ram_id)) {
905
 
                        /* This should never happen! */
906
 
                        XTXactDataPtr x_ptr;
907
 
 
908
 
                        check_ram_dummy = 0;
909
 
                        for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
910
 
                                if (check_ram_trns[i]) {
911
 
                                        x_ptr = xt_xn_get_xact(db, check_ram_trns[i]);
912
 
                                        check_ram_dummy = 1;
913
 
                                }
914
 
                        }
915
 
                        break;
916
 
                }
917
 
        }
918
 
        xt_unlock_rwlock_ns(&check_ram_lock);
919
 
}
920
 
 
921
 
static void check_ram_add(xtXactID xn_id)
922
 
{
923
 
        int i;
924
 
        
925
 
        xt_xlock_rwlock_ns(&check_ram_lock);
926
 
        for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
927
 
                if (!check_ram_trns[i]) {
928
 
                        check_ram_trns[i] = xn_id;
929
 
                        xt_unlock_rwlock_ns(&check_ram_lock);
930
 
                        return;
931
 
                }
932
 
        }
933
 
        xt_unlock_rwlock_ns(&check_ram_lock);
934
 
        printf("DEBUG --- List too small\n");
935
 
}
936
 
 
937
 
static void check_ram_del(xtXactID xn_id)
938
 
{
939
 
        int i;
940
 
        
941
 
        xt_xlock_rwlock_ns(&check_ram_lock);
942
 
        for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
943
 
                if (check_ram_trns[i] == xn_id) {
944
 
                        check_ram_trns[i] = 0;
945
 
                        xt_unlock_rwlock_ns(&check_ram_lock);
946
 
                        return;
947
 
                }
948
 
        }
949
 
        xt_unlock_rwlock_ns(&check_ram_lock);
950
 
}
951
 
#endif
952
 
 
953
 
/* ----------------------------------------------------------------------
954
 
 * Init and Exit
955
 
 */
956
 
 
957
 
xtPublic void xt_xn_init_db(XTThreadPtr self, XTDatabaseHPtr db)
958
 
{
959
 
        XTXactDataPtr   xact;
960
 
        XTXactSegPtr    seg;
961
 
 
962
 
#ifdef DEBUG_RAM_LIST
963
 
        check_ram_init();
964
 
#endif
965
 
        xt_spinlock_init_with_autoname(self, &db->db_xn_id_lock);
966
 
        xt_spinlock_init_with_autoname(self, &db->db_xn_wait_spinlock);
967
 
        xt_init_mutex_with_autoname(self, &db->db_xn_xa_lock);
968
 
        //xt_init_mutex_with_autoname(self, &db->db_xn_wait_lock);
969
 
        //xt_init_cond(self, &db->db_xn_wait_cond);
970
 
        xt_init_mutex_with_autoname(self, &db->db_sw_lock);
971
 
        xt_init_cond(self, &db->db_sw_cond);
972
 
        xt_init_mutex_with_autoname(self, &db->db_wr_lock);
973
 
        xt_init_cond(self, &db->db_wr_cond);
974
 
 
975
 
        /* Pre-allocate transaction data structures: */
976
 
        db->db_xn_data = (xtWord1 *) xt_malloc(self, sizeof(XTXactDataRec) * XT_XN_DATA_ALLOC_COUNT * XT_XN_NO_OF_SEGMENTS);
977
 
        db->db_xn_data_end = db->db_xn_data + sizeof(XTXactDataRec) * XT_XN_DATA_ALLOC_COUNT * XT_XN_NO_OF_SEGMENTS;
978
 
        xact = (XTXactDataPtr) db->db_xn_data;
979
 
        for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
980
 
                seg = &db->db_xn_idx[i];
981
 
                XT_XACT_INIT_LOCK(self, &seg->xs_tab_lock);
982
 
                for (u_int j=0;  j<XT_XN_DATA_ALLOC_COUNT; j++) {
983
 
                        xact->xd_next_xact = seg->xs_free_list;
984
 
                        seg->xs_free_list = xact;
985
 
                        xact++;
986
 
                }
987
 
        }
988
 
 
989
 
        /* Create a sorted list for XA transactions recovered: */
990
 
        db->db_xn_xa_list = xt_new_sortedlist(self, sizeof(XTXactXARec), 100, 50, xt_xn_xa_compare, db, NULL, FALSE, FALSE);
991
 
 
992
 
        /* Initialize the data logs: */
993
 
        db->db_datalogs.dlc_init(self, db); 
994
 
 
995
 
        /* Setup the transaction log: */
996
 
        db->db_xlog.xlog_setup(self, db, (off_t) xt_db_log_file_threshold, xt_db_transaction_buffer_size, xt_db_log_file_count);
997
 
 
998
 
        db->db_xn_end_time = 1;
999
 
 
1000
 
        /* Initializing the restart file, also does
1001
 
         * recovery. This returns the log position after recovery.
1002
 
         *
1003
 
         * This is the log position where the writer thread will
1004
 
         * begin. The writer thread writes changes to the database that
1005
 
         * have been flushed to the log.
1006
 
         */
1007
 
        xt_xres_init(self, db);
1008
 
 
1009
 
        /* Initialize the "last transaction in memory", by default
1010
 
         * this is the current transaction ID, which is the ID
1011
 
         * of the last transaction.
1012
 
         */
1013
 
        for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
1014
 
                seg = &db->db_xn_idx[i];
1015
 
                XT_XACT_INIT_LOCK(self, &seg->xs_tab_lock);
1016
 
                seg->xs_last_xn_id = db->db_xn_curr_id;
1017
 
        }
1018
 
 
1019
 
        /*
1020
 
         * The next transaction to clean is the lowest transaction
1021
 
         * in memory:
1022
 
         */
1023
 
        db->db_xn_to_clean_id = db->db_xn_min_ram_id;
1024
 
#ifdef XT_SWEEPER_SORT_XACTS
1025
 
        db->db_sw_to_add = db->db_xn_min_ram_id;
1026
 
#endif
1027
 
 
1028
 
        /*
1029
 
         * No transactions are running, so the minimum transaction
1030
 
         * ID is the next one to run:
1031
 
         */
1032
 
        db->db_xn_min_run_id = db->db_xn_curr_id + 1;
1033
 
 
1034
 
        db->db_xn_wait_for = xt_new_sortedlist(self, sizeof(XNWaitForRec), 100, 50, xn_compare_wait_for, db, xn_free_wait_for, FALSE, FALSE);
1035
 
}
1036
 
 
1037
 
xtPublic void xt_xn_exit_db(XTThreadPtr self, XTDatabaseHPtr db)
1038
 
{
1039
 
#ifdef HIGH_X
1040
 
        printf("=========> MOST TXs CURR ALLOC: %lu\n", tot_alloced);
1041
 
        printf("=========> MOST TXs HIGH ALLOC: %lu\n", high_alloced);
1042
 
        printf("=========> MAX TXs NOT CLEAN: %lu\n", not_clean_max);
1043
 
        printf("=========> MAX TXs IN RAM: %lu\n", in_ram_max);
1044
 
#endif
1045
 
        XTXactPreparePtr xap, xap_next;
1046
 
 
1047
 
        xt_stop_sweeper(self, db);      // Should be done already!
1048
 
        xt_stop_writer(self, db);       // Should be done already!
1049
 
 
1050
 
        xt_xres_exit(self, db);
1051
 
        db->db_xlog.xlog_exit(self);
1052
 
 
1053
 
        db->db_datalogs.dlc_exit(self); 
1054
 
 
1055
 
        for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
1056
 
                XTXactSegPtr    seg;
1057
 
 
1058
 
                seg = &db->db_xn_idx[i];
1059
 
                for (u_int j=0; j<XT_XN_HASH_TABLE_SIZE; j++) {
1060
 
                        XTXactDataPtr   xact, nxact;
1061
 
                        
1062
 
                        xact = seg->xs_table[j];
1063
 
                        while (xact) {
1064
 
                                nxact = xact->xd_next_xact;
1065
 
                                xn_free_xact(db, seg, xact);
1066
 
                                xact = nxact;
1067
 
                        }
1068
 
                }
1069
 
                XT_XACT_FREE_LOCK(self, &seg->xs_tab_lock);
1070
 
        }
1071
 
        if (db->db_xn_wait_for) {
1072
 
                xt_free_sortedlist(self, db->db_xn_wait_for);
1073
 
                db->db_xn_wait_for = NULL;
1074
 
        }
1075
 
        if (db->db_xn_data) {
1076
 
                xt_free(self, db->db_xn_data);
1077
 
                db->db_xn_data = NULL;
1078
 
                db->db_xn_data_end = NULL;
1079
 
        }
1080
 
 
1081
 
        xt_free_cond(&db->db_wr_cond);
1082
 
        xt_free_mutex(&db->db_wr_lock);
1083
 
        xt_free_cond(&db->db_sw_cond);
1084
 
        xt_free_mutex(&db->db_sw_lock);
1085
 
        //xt_free_cond(&db->db_xn_wait_cond);
1086
 
        //xt_free_mutex(&db->db_xn_wait_lock);
1087
 
        xt_free_mutex(&db->db_xn_xa_lock);
1088
 
        for (u_int i=0; i<XT_XA_HASH_TAB_SIZE; i++) {
1089
 
                xap = db->db_xn_xa_table[i];
1090
 
                while (xap) {
1091
 
                        xap_next = xap->xp_next;
1092
 
                        xt_free(self, xap);
1093
 
                        xap = xap_next;
1094
 
                }
1095
 
        }
1096
 
        if (db->db_xn_xa_list) {
1097
 
                xt_free_sortedlist(self, db->db_xn_xa_list);
1098
 
                db->db_xn_xa_list = NULL;
1099
 
        }
1100
 
        xt_spinlock_free(self, &db->db_xn_wait_spinlock);
1101
 
        xt_spinlock_free(self, &db->db_xn_id_lock);
1102
 
#ifdef DEBUG_RAM_LIST
1103
 
        check_ram_free();
1104
 
#endif
1105
 
}
1106
 
 
1107
 
xtPublic void xt_xn_init_thread(XTThreadPtr self, int what_for)
1108
 
{
1109
 
        ASSERT(self->st_database);
1110
 
 
1111
 
        if (!xt_init_row_lock_list(&self->st_lock_list))
1112
 
                xt_throw(self);
1113
 
        switch (what_for) {
1114
 
                case XT_FOR_COMPACTOR:
1115
 
                        self->st_dlog_buf.dlb_init(self->st_database, xt_db_log_buffer_size);
1116
 
                        break;
1117
 
                case XT_FOR_WRITER:
1118
 
                        /* The writer does not need a transaction buffer. */
1119
 
                        self->st_dlog_buf.dlb_init(self->st_database, 0);
1120
 
                        break;
1121
 
                case XT_FOR_SWEEPER:
1122
 
                case XT_FOR_POOL:
1123
 
                        self->st_dlog_buf.dlb_init(self->st_database, 0);
1124
 
                        break;
1125
 
                case XT_FOR_USER:
1126
 
                        self->st_dlog_buf.dlb_init(self->st_database, xt_db_log_buffer_size);
1127
 
                        break;
1128
 
        }
1129
 
}
1130
 
 
1131
 
xtPublic void xt_xn_exit_thread(XTThreadPtr self)
1132
 
{
1133
 
        if (self->st_xact_data)
1134
 
                xt_xn_rollback(self);
1135
 
        self->st_dlog_buf.dlb_exit(self);
1136
 
        xt_exit_row_lock_list(&self->st_lock_list);
1137
 
}
1138
 
 
1139
 
/* ----------------------------------------------------------------------
1140
 
 * Begin and End Transactions
1141
 
 */
1142
 
 
1143
 
xtPublic xtBool xt_xn_begin(XTThreadPtr self)
1144
 
{
1145
 
        XTDatabaseHPtr  db = self->st_database;
1146
 
        xtXactID                xn_id;
1147
 
 
1148
 
        ASSERT(!self->st_xact_data);
1149
 
 
1150
 
        xt_spinlock_lock(&db->db_xn_id_lock);
1151
 
        xn_id = ++db->db_xn_curr_id;
1152
 
        xt_spinlock_unlock(&db->db_xn_id_lock);
1153
 
 
1154
 
#ifdef HIGH_X
1155
 
        if (xt_xn_is_before(not_clean_max, xn_id - db->db_xn_to_clean_id))
1156
 
                not_clean_max = xn_id - db->db_xn_to_clean_id;
1157
 
        if (xt_xn_is_before(in_ram_max, xn_id - db->db_xn_min_ram_id))
1158
 
                in_ram_max = xn_id - db->db_xn_min_ram_id;
1159
 
#endif
1160
 
        /* {GAP-INC-ADD-XACT} This is the gap between incrementing the ID,
1161
 
         * and creating the transaction in memory.
1162
 
         * See xt_xn_get_curr_id().
1163
 
         */
1164
 
 
1165
 
        if (!(self->st_xact_data = xn_add_new_xact(db, xn_id, self)))
1166
 
                return FAILED;
1167
 
        self->st_xact_writer = FALSE;
1168
 
        
1169
 
        /* All transactions that committed before or at this time
1170
 
         * are this one are visible: */
1171
 
        self->st_visible_time = db->db_xn_end_time;
1172
 
 
1173
 
#ifdef TRACE_TRANSACTION
1174
 
        xt_ttracef(self, "BEGIN T%lu\n", (u_long) self->st_xact_data->xd_start_xn_id);
1175
 
#endif
1176
 
#ifdef XT_TRACK_CONNECTIONS
1177
 
        xt_track_conn_info[self->t_id].ci_curr_xact_id = self->st_xact_data->xd_start_xn_id;
1178
 
        xt_track_conn_info[self->t_id].ci_xact_start = xt_trace_clock();
1179
 
#endif
1180
 
        return OK;
1181
 
}
1182
 
 
1183
 
static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
1184
 
{
1185
 
        XTXactDataPtr   xact;
1186
 
        xtBool                  ok = TRUE;
1187
 
 
1188
 
        ASSERT_NS(thread->st_xact_data);
1189
 
        if ((xact = thread->st_xact_data)) {
1190
 
                XTDatabaseHPtr  db = thread->st_database;
1191
 
                xtXactID                xn_id = xact->xd_start_xn_id;
1192
 
                xtBool                  writer;
1193
 
                
1194
 
                if ((writer = thread->st_xact_writer)) {
1195
 
                        /* The transaction wrote something: */
1196
 
                        XTXactEndEntryDRec      entry;
1197
 
                        xtWord4                         sum;
1198
 
 
1199
 
                        sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
1200
 
                        entry.xe_status_1 = status;
1201
 
                        entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
1202
 
                        XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
1203
 
                        XT_SET_DISK_4(entry.xe_not_used_4, 0);
1204
 
 
1205
 
#ifdef XT_IMPLEMENT_NO_ACTION
1206
 
                        /* This will check any resticts that have been delayed to the end of the statement. */
1207
 
                        if (thread->st_restrict_list.bl_count) {
1208
 
                                if (!xt_tab_restrict_rows(&thread->st_restrict_list, thread)) {
1209
 
                                        ok = FALSE;
1210
 
                                        status = XT_LOG_ENT_ABORT;
1211
 
                                }
1212
 
                        }
1213
 
#endif
1214
 
 
1215
 
                        /* Flush the data log: */
1216
 
                        if (!thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
1217
 
                                ok = FALSE;
1218
 
                                status = XT_LOG_ENT_ABORT;
1219
 
                        }
1220
 
 
1221
 
                        /* Write and flush the transaction log:
1222
 
                         * We only flush if this was not a temp table.
1223
 
                         */
1224
 
                        if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, thread->st_non_temp_updated ? xt_db_flush_log_at_trx_commit : XT_XLOG_NO_WRITE_NO_FLUSH)) {
1225
 
                                ok = FALSE;
1226
 
                                status = XT_LOG_ENT_ABORT;
1227
 
                                /* Make sure this is done, if we failed to log
1228
 
                                 * the transction end!
1229
 
                                 */
1230
 
                                if (thread->st_xact_writer) {
1231
 
                                        /* Adjust this in case of error, but don't forget
1232
 
                                         * to lock!
1233
 
                                         */
1234
 
                                        xt_spinlock_lock(&db->db_xlog.xl_buffer_lock);
1235
 
                                        db->db_xn_writer_count--;
1236
 
                                        thread->st_xact_writer = FALSE;
1237
 
                                        if (thread->st_xact_long_running) {
1238
 
                                                db->db_xn_long_running_count--;
1239
 
                                                thread->st_xact_long_running = FALSE;
1240
 
                                        }
1241
 
                                        xt_spinlock_unlock(&db->db_xlog.xl_buffer_lock);
1242
 
                                }
1243
 
                        }
1244
 
 
1245
 
                        /* Setting this flag completes the transaction,
1246
 
                         * Do this before we release the locks, because
1247
 
                         * the unlocked transactions expect the
1248
 
                         * transaction they are waiting for to be
1249
 
                         * gone!
1250
 
                         */
1251
 
                        xact->xd_end_time = ++db->db_xn_end_time;
1252
 
                        if (status == XT_LOG_ENT_COMMIT) {
1253
 
                                thread->st_statistics.st_commits++;
1254
 
                                xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
1255
 
                        }
1256
 
                        else {
1257
 
                                thread->st_statistics.st_rollbacks++;
1258
 
                                xact->xd_flags |= XT_XN_XAC_ENDED;
1259
 
                        }
1260
 
 
1261
 
                        /* {REMOVE-LOCKS} Drop locks is you have any: */
1262
 
                        thread->st_lock_list.xt_remove_all_locks(db, thread);
1263
 
 
1264
 
                        /* Do this afterwards to make sure the sweeper
1265
 
                         * does not cleanup transactions start cleaning up
1266
 
                         * before any transactions that were waiting for
1267
 
                         * this transaction have completed!
1268
 
                         */
1269
 
                        xact->xd_end_xn_id = db->db_xn_curr_id;
1270
 
 
1271
 
                        /* Now you can sweep! */
1272
 
                        ASSERT_NS(xact->xd_flags & XT_XN_XAC_LOGGED);
1273
 
                        xact->xd_flags |= XT_XN_XAC_SWEEP;
1274
 
                }
1275
 
                else {
1276
 
                        /* Read-only transaction can be removed, immediately */
1277
 
                        xact->xd_end_time = ++db->db_xn_end_time;
1278
 
                        xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
1279
 
 
1280
 
                        /* Drop locks is you have any: */
1281
 
                        thread->st_lock_list.xt_remove_all_locks(db, thread);
1282
 
 
1283
 
                        xact->xd_end_xn_id = db->db_xn_curr_id;
1284
 
 
1285
 
                        ASSERT_NS(!(xact->xd_flags & XT_XN_XAC_LOGGED));
1286
 
                        xact->xd_flags |= XT_XN_XAC_SWEEP;
1287
 
 
1288
 
                        if (xt_xn_delete_xact(db, xn_id, thread)) {
1289
 
                                if (db->db_xn_min_ram_id == xn_id)
1290
 
                                        db->db_xn_min_ram_id = xn_id+1;
1291
 
                        }
1292
 
                }
1293
 
 
1294
 
#ifdef TRACE_TRANSACTION
1295
 
                if (status == XT_LOG_ENT_COMMIT)
1296
 
                        xt_ttracef(thread, "COMMIT T%lu\n", (u_long) xn_id);
1297
 
                else
1298
 
                        xt_ttracef(thread, "ABORT T%lu\n", (u_long) xn_id);
1299
 
#endif
1300
 
 
1301
 
                if (db->db_xn_min_run_id == xn_id)
1302
 
                        db->db_xn_min_run_id = xn_id+1;
1303
 
 
1304
 
                thread->st_xact_data = NULL;
1305
 
 
1306
 
#ifdef XT_TRACK_CONNECTIONS
1307
 
                xt_track_conn_info[thread->t_id].ci_prev_xact_id = xt_track_conn_info[thread->t_id].ci_curr_xact_id;
1308
 
                xt_track_conn_info[thread->t_id].ci_prev_xact_time = xt_trace_clock() - xt_track_conn_info[thread->t_id].ci_xact_start;
1309
 
                xt_track_conn_info[thread->t_id].ci_curr_xact_id = 0;
1310
 
                xt_track_conn_info[thread->t_id].ci_xact_start = 0;
1311
 
#endif
1312
 
 
1313
 
                xt_wakeup_waiting_threads(thread);
1314
 
 
1315
 
                /* {WAKE-SW} Waking the sweeper
1316
 
                 * is no longer unconditional.
1317
 
                 * (see all comments to {WAKE-SW})
1318
 
                 *
1319
 
                 * We now wake the sweeper if it is
1320
 
                 * supposed to work faster.
1321
 
                 *
1322
 
                 * There are now 2 cases:
1323
 
                 * - We run out of transaction slots.
1324
 
                 * - We encounter old index entries.
1325
 
                 *
1326
 
                 * The following test:
1327
 
                 * runTest(INCREMENT_TEST, 16, INCREMENT_TEST_UPDATE_COUNT);
1328
 
                 * has extreme problems with sweeping every 1/10s
1329
 
                 * because a huge number of index entries accumulate
1330
 
                 * that need to be cleaned.
1331
 
                 *
1332
 
                 * New code detects this case.
1333
 
                 */
1334
 
                if (db->db_sw_faster)
1335
 
                        xt_wakeup_sweeper(db);
1336
 
 
1337
 
                /* Don't get too far ahead of the sweeper! */
1338
 
                if (writer) {
1339
 
#ifdef XT_WAIT_FOR_CLEANUP
1340
 
                        if (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND) {
1341
 
                                /* Set a maximum wait time (1/100s) */
1342
 
                                xtWord8         then = xt_trace_clock() + (xtWord8) 100000;
1343
 
                                xtXactID        wait_xn_id;
1344
 
                                
1345
 
                                /* This is the transaction that was committed 3 transactions ago: */
1346
 
                                wait_xn_id = thread->st_prev_xact[thread->st_last_xact];
1347
 
                                thread->st_prev_xact[thread->st_last_xact] = xn_id;
1348
 
                                /* This works because XT_MAX_XACT_BEHIND == 2! */
1349
 
                                ASSERT_NS((thread->st_last_xact + 1) % XT_MAX_XACT_BEHIND == (thread->st_last_xact ^ 1));
1350
 
                                thread->st_last_xact ^= 1;
1351
 
 
1352
 
                                while (xt_xn_is_before(db->db_xn_to_clean_id, wait_xn_id) && (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND)) {
1353
 
                                        if (xt_trace_clock() >= then)
1354
 
                                                break;
1355
 
#ifdef XT_SWEEPER_SORT_XACTS
1356
 
                                        if (!xn_get_xact_start(db, wait_xn_id, thread, NULL, NULL))
1357
 
                                                break;
1358
 
#endif
1359
 
                                        xt_critical_wait();
1360
 
                                }
1361
 
                        }
1362
 
#else
1363
 
                        if ((db->db_sw_faster & XT_SW_TOO_FAR_BEHIND) != 0) {
1364
 
                                xtWord8 then = xt_trace_clock() + (xtWord8) 20000000;
1365
 
 
1366
 
                                for (;;) {
1367
 
                                        xt_critical_wait();
1368
 
                                        if (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND)
1369
 
                                                break;
1370
 
                                        if (xt_trace_clock() >= then)
1371
 
                                                break;
1372
 
                                }
1373
 
                        }
1374
 
#endif
1375
 
                }
1376
 
        }
1377
 
        return ok;
1378
 
}
1379
 
 
1380
 
xtPublic xtBool xt_xn_commit(XTThreadPtr thread)
1381
 
{
1382
 
        return xn_end_xact(thread, XT_LOG_ENT_COMMIT);
1383
 
}
1384
 
 
1385
 
xtPublic xtBool xt_xn_rollback(XTThreadPtr thread)
1386
 
{
1387
 
        return xn_end_xact(thread, XT_LOG_ENT_ABORT);
1388
 
}
1389
 
 
1390
 
xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id)
1391
 
{
1392
 
        XTXactNewTabEntryDRec   entry;
1393
 
 
1394
 
        entry.xt_status_1 = XT_LOG_ENT_NEW_TAB;
1395
 
        entry.xt_checksum_1 = XT_CHECKSUM_1(tab_id);
1396
 
        XT_SET_DISK_4(entry.xt_tab_id_4, tab_id);
1397
 
        return xt_xlog_log_data(self, sizeof(XTXactNewTabEntryDRec), (XTXactLogBufferDPtr) &entry, XT_XLOG_WRITE_AND_FLUSH);
1398
 
}
1399
 
 
1400
 
xtPublic int xt_xn_status(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID XT_UNUSED(rec_id))
1401
 
{
1402
 
        register XTThreadPtr    self = ot->ot_thread;
1403
 
        int                                             flags;
1404
 
        xtWord4                                 end;
1405
 
 
1406
 
#ifdef DRIZZLED
1407
 
        /* Conditional waste of time!
1408
 
         * Drizzle has strict warnings.
1409
 
         * I know this is not necessary!
1410
 
         */
1411
 
        flags = 0;
1412
 
        end = 0;
1413
 
#endif
1414
 
        if (xn_id == self->st_xact_data->xd_start_xn_id)
1415
 
                return XT_XN_MY_UPDATE;
1416
 
        if (xt_xn_is_before(xn_id, self->st_database->db_xn_min_ram_id) ||
1417
 
                !xn_get_xact_details(self->st_database, xn_id, ot->ot_thread, &flags, NULL, &end, NULL)) {
1418
 
                /* Not in RAM, rollback done: */
1419
 
//*DBG*/xt_dump_xlogs(self->st_database, 0);
1420
 
//*DBG*/xt_check_table(self, ot);
1421
 
//*DBG*/xt_dump_trace();
1422
 
                /* {XACT-NOT-IN-RAM}
1423
 
                 * This should never happen (CHANGED see below)!
1424
 
                 *
1425
 
                 * Because if the transaction is no longer in RAM, then it has been
1426
 
                 * cleaned up. So the record should be marked as clean, or not
1427
 
                 * exist.
1428
 
                 *
1429
 
                 * After sweeping, we wait for all transactions to quit that were
1430
 
                 * running at the time of cleanup before removing the transaction record.
1431
 
                 * (see {XACT-NOT-IN-RAM})
1432
 
                 *
1433
 
                 * If this was not the case, then we could be here because:
1434
 
                 * - The user transaction (T2) reads record x and notes that the record
1435
 
                 * has not been cleaned (CLEAN bit not set).
1436
 
                 *
1437
 
                 * - The sweeper is busy sweeping the transaction (T1) that created
1438
 
                 * record x.
1439
 
                 * The SW sets the CLEAN bit on record x, and the schedules T1 for
1440
 
                 * deletion.
1441
 
                 *
1442
 
                 * Now T1 should not be deleted before T2 quits. If it does happen
1443
 
                 * then we land up here.
1444
 
                 *
1445
 
                 * THIS CAN NOW HAPPEN!
1446
 
                 *
1447
 
                 * First of all, a MYSTERY:
1448
 
                 * This did happen, dispite the description above! The reason why
1449
 
                 * is left as an exercise to the reader (really, I don't now why!)
1450
 
                 *
1451
 
                 * This has force me to add code to handle the situation. This
1452
 
                 * is done by re-reading the record that is being checked by this
1453
 
                 * function. After re-reading, the record should either be
1454
 
                 * invalid (free) or clean (CLEAN bit set).
1455
 
                 *
1456
 
                 * If this is the case, then we will not run land up here
1457
 
                 * again.
1458
 
                 *
1459
 
                 * Because we are only here because the record was valid but not
1460
 
                 * clean (you can confirm this by looking at the code that
1461
 
                 * calls this function).
1462
 
                 *
1463
 
                 * See {RETRY-READ}
1464
 
                 */
1465
 
                return XT_XN_REREAD;
1466
 
        }
1467
 
        if (!(flags & XT_XN_XAC_ENDED))
1468
 
                /* Transaction not ended, may be visible. */
1469
 
                return XT_XN_OTHER_UPDATE;
1470
 
        /* Visible if the transaction was committed: */
1471
 
        if (flags & XT_XN_XAC_COMMITTED) {
1472
 
                if (!xt_xn_is_before(self->st_visible_time, end))  // was self->st_visible_time >= xact->xd_end_time
1473
 
                        return XT_XN_VISIBLE;
1474
 
                return XT_XN_NOT_VISIBLE;
1475
 
        }
1476
 
        return XT_XN_ABORTED;
1477
 
}
1478
 
 
1479
 
/* ----------------------------------------------------------------------
1480
 
 * XA Functionality
1481
 
 */
1482
 
 
1483
 
xtPublic int xt_xn_xa_compare(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
1484
 
{
1485
 
        xtXactID        *x = (xtXactID *) a;
1486
 
        XTXactXAPtr     y = (XTXactXAPtr) b;
1487
 
 
1488
 
        if (*x == y->xx_xact_id)
1489
 
                return 0;
1490
 
        if (xt_xn_is_before(*x, y->xx_xact_id))
1491
 
                return -1;
1492
 
        return 1;
1493
 
}
1494
 
 
1495
 
xtPublic xtBool xt_xn_prepare(int len, xtWord1 *xa_data, XTThreadPtr thread)
1496
 
{
1497
 
        XTXactDataPtr xact;
1498
 
 
1499
 
        ASSERT_NS(thread->st_xact_data);
1500
 
        if ((xact = thread->st_xact_data)) {
1501
 
                xtXactID xn_id = xact->xd_start_xn_id;
1502
 
 
1503
 
                /* Only makes sense if the transaction has already been logged: */
1504
 
                if ((thread->st_xact_data->xd_flags & XT_XN_XAC_LOGGED)) {
1505
 
                        if (!xt_xlog_modify_table(0, XT_LOG_ENT_PREPARE, xn_id, 0, 0, 0, len, xa_data, thread))
1506
 
                                return FAILED;
1507
 
                }
1508
 
        }
1509
 
        return OK;
1510
 
}
1511
 
 
1512
 
xtPublic xtBool xt_xn_store_xa_data(XTDatabaseHPtr db, xtXactID xact_id, int len, xtWord1 *xa_data, XTThreadPtr XT_UNUSED(thread))
1513
 
{
1514
 
        XTXactPreparePtr        xap;
1515
 
        u_int                           idx;
1516
 
        XTXactXARec                     xx;
1517
 
 
1518
 
        if (!(xap = (XTXactPreparePtr) xt_malloc_ns(offsetof(XTXactPrepareRec, xp_xa_data) + len)))
1519
 
                return FAILED;
1520
 
        xap->xp_xact_id = xact_id;
1521
 
        xap->xp_hash = xt_get_checksum4(xa_data, len);
1522
 
        xap->xp_data_len = len;
1523
 
        memcpy(xap->xp_xa_data, xa_data, len);
1524
 
        xx.xx_xact_id = xact_id;
1525
 
        xx.xx_xa_ptr = xap;
1526
 
 
1527
 
        idx = xap->xp_hash % XT_XA_HASH_TAB_SIZE;
1528
 
        xt_lock_mutex_ns(&db->db_xn_xa_lock);
1529
 
        if (!xt_sl_insert(NULL, db->db_xn_xa_list, &xact_id, &xx)) {
1530
 
                xt_unlock_mutex_ns(&db->db_xn_xa_lock);
1531
 
                xt_free_ns(xap);
1532
 
        }
1533
 
        xap->xp_next = db->db_xn_xa_table[idx];
1534
 
        db->db_xn_xa_table[idx] = xap;
1535
 
        xt_unlock_mutex_ns(&db->db_xn_xa_lock);
1536
 
        return OK;
1537
 
}
1538
 
 
1539
 
xtPublic void xt_xn_delete_xa_data_by_xact(XTDatabaseHPtr db, xtXactID xact_id, XTThreadPtr thread)
1540
 
{
1541
 
        XTXactXAPtr xx;
1542
 
 
1543
 
        xt_lock_mutex_ns(&db->db_xn_xa_lock);
1544
 
        if (!(xx = (XTXactXAPtr) xt_sl_find(NULL, db->db_xn_xa_list, &xact_id)))
1545
 
                return;
1546
 
        xt_xn_delete_xa_data(db, xx->xx_xa_ptr, TRUE, thread);
1547
 
}
1548
 
 
1549
 
xtPublic void xt_xn_delete_xa_data(XTDatabaseHPtr db, XTXactPreparePtr xap, xtBool unlock, XTThreadPtr XT_UNUSED(thread))
1550
 
{
1551
 
        u_int                           idx;
1552
 
        XTXactPreparePtr        xap_ptr, xap_pptr = NULL;
1553
 
 
1554
 
        xt_sl_delete(NULL, db->db_xn_xa_list, &xap->xp_xact_id);
1555
 
        idx = xap->xp_hash % XT_XA_HASH_TAB_SIZE;
1556
 
        xap_ptr = db->db_xn_xa_table[idx];
1557
 
        while (xap_ptr) {
1558
 
                if (xap_ptr == xap)
1559
 
                        break;
1560
 
                xap_pptr = xap_ptr;
1561
 
                xap_ptr = xap_ptr->xp_next;
1562
 
        }
1563
 
        if (xap_ptr) {
1564
 
                if (xap_pptr)
1565
 
                        xap_pptr->xp_next = xap_ptr->xp_next;
1566
 
                else
1567
 
                        db->db_xn_xa_table[idx] = xap_ptr->xp_next;
1568
 
                xt_free_ns(xap);
1569
 
        }
1570
 
        if (unlock)
1571
 
                xt_unlock_mutex_ns(&db->db_xn_xa_lock);
1572
 
}
1573
 
 
1574
 
xtPublic XTXactPreparePtr xt_xn_find_xa_data(XTDatabaseHPtr db, int len, xtWord1 *xa_data, xtBool lock, XTThreadPtr XT_UNUSED(thread))
1575
 
{
1576
 
        xtWord4                         hash;
1577
 
        XTXactPreparePtr        xap;
1578
 
        u_int                           idx;
1579
 
 
1580
 
        if (lock)
1581
 
                xt_lock_mutex_ns(&db->db_xn_xa_lock);
1582
 
        hash = xt_get_checksum4(xa_data, len);
1583
 
        idx = hash % XT_XA_HASH_TAB_SIZE;
1584
 
        xap = db->db_xn_xa_table[idx];
1585
 
        while (xap) {
1586
 
                if (xap->xp_hash == hash &&
1587
 
                        xap->xp_data_len == len &&
1588
 
                        memcmp(xap->xp_xa_data, xa_data, len) == 0) {
1589
 
                        break;
1590
 
                }
1591
 
                xap = xap->xp_next;
1592
 
        }
1593
 
        
1594
 
        return xap;
1595
 
}
1596
 
 
1597
 
xtPublic XTXactPreparePtr xt_xn_enum_xa_data(XTDatabaseHPtr db, XTXactEnumXAPtr exa)
1598
 
{
1599
 
        XTXactXAPtr xx;
1600
 
 
1601
 
        if (!exa->exa_locked) {
1602
 
                xt_lock_mutex_ns(&db->db_xn_xa_lock);
1603
 
                exa->exa_locked = TRUE;
1604
 
        }
1605
 
 
1606
 
        if ((xx = (XTXactXAPtr) xt_sl_item_at(db->db_xn_xa_list, exa->exa_index))) {
1607
 
                exa->exa_index++;
1608
 
                return xx->xx_xa_ptr;
1609
 
        }
1610
 
 
1611
 
        if (exa->exa_locked) {
1612
 
                exa->exa_locked = FALSE;
1613
 
                xt_unlock_mutex_ns(&db->db_xn_xa_lock);
1614
 
        }
1615
 
        return NULL;
1616
 
}
1617
 
 
1618
 
/* ----------------------------------------------------------------------
1619
 
 * S W E E P E R    F U N C T I O N S
1620
 
 */
1621
 
 
1622
 
xtPublic xtWord8 xt_xn_bytes_to_sweep(XTDatabaseHPtr db, XTThreadPtr thread)
1623
 
{
1624
 
        xtXactID                                xn_id;
1625
 
        xtXactID                                curr_xn_id;
1626
 
        xtLogID                                 xn_log_id = 0;
1627
 
        xtLogOffset                             xn_log_offset = 0;
1628
 
        xtLogID                                 x_log_id = 0;
1629
 
        xtLogOffset                             x_log_offset = 0;
1630
 
        xtLogID                                 log_id;
1631
 
        xtLogOffset                             log_offset;
1632
 
        xtWord8                                 byte_count = 0;
1633
 
 
1634
 
        xn_id = db->db_xn_to_clean_id;
1635
 
        curr_xn_id = xt_xn_get_curr_id(db);
1636
 
        // Limit the number of transactions checked!
1637
 
        for (int i=0; i<1000; i++) {
1638
 
                if (xt_xn_is_before(curr_xn_id, xn_id))
1639
 
                        break;
1640
 
                if (xn_get_xact_start(db, xn_id, thread, &x_log_id, &x_log_offset)) {
1641
 
                        if (xn_log_id) {
1642
 
                                if (xt_comp_log_pos(x_log_id, x_log_offset, xn_log_id, xn_log_offset) < 0) {
1643
 
                                        xn_log_id = x_log_id;
1644
 
                                        xn_log_offset = x_log_offset;
1645
 
                                }
1646
 
                        }
1647
 
                        else {
1648
 
                                xn_log_id = x_log_id;
1649
 
                                xn_log_offset = x_log_offset;
1650
 
                        }
1651
 
                }
1652
 
                xn_id++;
1653
 
        }
1654
 
        if (!xn_log_id)
1655
 
                return 0;
1656
 
 
1657
 
        /* Assume the logs have the threshold: */
1658
 
        log_id = db->db_xlog.xl_write_log_id;
1659
 
        log_offset = db->db_xlog.xl_write_log_offset;
1660
 
        if (xn_log_id < log_id) {
1661
 
                if (xn_log_offset < xt_db_log_file_threshold)
1662
 
                        byte_count = (size_t) (xt_db_log_file_threshold - xn_log_offset);
1663
 
                xn_log_offset = 0;
1664
 
                xn_log_id++;
1665
 
        }
1666
 
        while (xn_log_id < log_id) {
1667
 
                byte_count += (size_t) xt_db_log_file_threshold;
1668
 
                xn_log_id++;
1669
 
        }
1670
 
        if (xn_log_offset < log_offset)
1671
 
                byte_count += (size_t) (log_offset - xn_log_offset);
1672
 
 
1673
 
        return byte_count;
1674
 
}
1675
 
 
1676
 
/* ----------------------------------------------------------------------
1677
 
 * S W E E P E R    P R O C E S S
1678
 
 */
1679
 
 
1680
 
typedef struct XNSweeperState {
1681
 
        XTDatabaseHPtr                  ss_db;
1682
 
        XTXactSeqReadRec                ss_seqread;
1683
 
        XTDataBufferRec                 ss_databuf;
1684
 
        u_int                                   ss_call_cnt;
1685
 
        XTBasicQueueRec                 ss_to_free;
1686
 
        xtBool                                  ss_flush_pending;
1687
 
        xtTableID                               ss_not_found;                           /* Cache the last table not found, this saves time. */
1688
 
        xtTableID                               ss_not_recovered;                       /* Cache the last table not recovered. */
1689
 
        XTOpenTablePtr                  ss_ot;
1690
 
} XNSweeperStateRec, *XNSweeperStatePtr;
1691
 
 
1692
 
/*
1693
 
 * This function NULL if the table cannot be opened.
1694
 
 * In this case cleanup_done will be set to TRUE
1695
 
 * if the cleanup should be skipped.
1696
 
 *
1697
 
 */
1698
 
static XTOpenTablePtr xn_sw_get_open_table(XTThreadPtr self, XNSweeperStatePtr ss, xtTableID tab_id, xtBool *skip_cleanup)
1699
 
{
1700
 
        if (ss->ss_ot) {
1701
 
                if (ss->ss_ot->ot_table->tab_id == tab_id)
1702
 
                        return ss->ss_ot;
1703
 
 
1704
 
                xt_db_return_table_to_pool(self, ss->ss_ot);
1705
 
                ss->ss_ot = NULL;
1706
 
        }
1707
 
 
1708
 
        if (ss->ss_not_found == tab_id || ss->ss_not_recovered == tab_id) {
1709
 
                *skip_cleanup = TRUE;
1710
 
                return NULL;
1711
 
        }
1712
 
 
1713
 
        if (!ss->ss_ot) {
1714
 
                int r;
1715
 
 
1716
 
                if (!(ss->ss_ot = xt_db_open_pool_table(self, ss->ss_db, tab_id, &r, TRUE))) {
1717
 
                        switch (r) {
1718
 
                                case XT_TAB_NOT_FOUND:
1719
 
                                        /* Remember the table if it was not found: */
1720
 
                                        ss->ss_not_found = tab_id;
1721
 
                                        *skip_cleanup = TRUE;
1722
 
                                        break;
1723
 
                                case XT_TAB_NO_DICTIONARY:
1724
 
                                case XT_TAB_POOL_CLOSED:
1725
 
                                        *skip_cleanup = FALSE;
1726
 
                                        break;
1727
 
                                default:
1728
 
                                        *skip_cleanup = TRUE;
1729
 
                                        break;
1730
 
                        }
1731
 
                        return NULL;
1732
 
                }
1733
 
                
1734
 
                /* Don't sweep transactions for table that have not been
1735
 
                 * recovered.
1736
 
                 */
1737
 
                if (ss->ss_ot->ot_table->tab_recovery_not_done) {
1738
 
                        xt_db_return_table_to_pool(self, ss->ss_ot);
1739
 
                        ss->ss_ot = NULL;
1740
 
                        ss->ss_not_recovered = tab_id;
1741
 
                        *skip_cleanup = TRUE;
1742
 
                        return NULL;
1743
 
                }
1744
 
        }
1745
 
 
1746
 
        return ss->ss_ot;
1747
 
}
1748
 
 
1749
 
static void xn_sw_close_open_table(XTThreadPtr self, XNSweeperStatePtr ss)
1750
 
{
1751
 
        if (ss->ss_ot) {
1752
 
                xt_db_return_table_to_pool(self, ss->ss_ot);
1753
 
                ss->ss_ot = NULL;
1754
 
        }
1755
 
}
1756
 
 
1757
 
/*
1758
 
 * A thread can set a bit in db_sw_faster to make
1759
 
 * the sweeper go faster.
1760
 
 */
1761
 
static void xn_sw_could_go_faster(XTThreadPtr self, XTDatabaseHPtr db)
1762
 
{
1763
 
        if (db->db_sw_faster) {
1764
 
                if (!db->db_sw_fast) {
1765
 
                        xt_set_priority(self, xt_db_sweeper_priority+1);
1766
 
                        db->db_sw_fast = TRUE;
1767
 
                }
1768
 
        }
1769
 
}
1770
 
 
1771
 
static void xn_sw_go_slower(XTThreadPtr self, XTDatabaseHPtr db)
1772
 
{
1773
 
        if (db->db_sw_fast) {
1774
 
                xt_set_priority(self, xt_db_sweeper_priority);
1775
 
                db->db_sw_fast = FALSE;
1776
 
        }
1777
 
        db->db_sw_faster = XT_SW_WORK_NORMAL;
1778
 
}
1779
 
 
1780
 
/* Add a record to the "to free" queue. We note the current
1781
 
 * transaction at the time this is done. The record will
1782
 
 * only be freed once this transaction terminated, together
1783
 
 * with all transactions that started before it! 
1784
 
 *
1785
 
 * The reason for this is that a sequential scan or some
1786
 
 * other operation may read a committed record which is no longer
1787
 
 * valid because it is no longer the latest variation (the first
1788
 
 * variation reachable from the row pointer).
1789
 
 *
1790
 
 * In this case, the sweeper will free the variation.
1791
 
 * If the variation is re-used and committed before
1792
 
 * the sequential scan or read completes, and by some
1793
 
 * fluke is used by the same record as previously,
1794
 
 * the system will think the record is valid
1795
 
 * again.
1796
 
 *
1797
 
 * Without re-reading the record the sequential
1798
 
 * scan or other read will find it on the variation list, and
1799
 
 * return the record data as if valid!
1800
 
 *
1801
 
 * ------------ 2008-01-03
1802
 
 *
1803
 
 * An example of this is:
1804
 
 *
1805
 
 * Assume we have 3 records.
1806
 
 * The 3rd record is deleted, and committed.
1807
 
 * Before cleanup can be performed
1808
 
 * a sequential scan takes a copy of the records.
1809
 
 *
1810
 
 * Now assume a new insert is done before
1811
 
 * the sequential scan gets to the 3rd record.
1812
 
 *
1813
 
 * The insert allocates the 3rd row and 3rd record
1814
 
 * again.
1815
 
 *
1816
 
 * Now, when the sequential scan gets to the old copy of the 3rd record,
1817
 
 * this is valid because the row points to this record again.
1818
 
 *
1819
 
 * HOWEVER! I have now changed the sequential scan so that it accesses
1820
 
 * the records from the cache, without making a copy.
1821
 
 *
1822
 
 * This means that this problem cannot occur because the sequential scan
1823
 
 * always reads the current data from the cache.
1824
 
 *
1825
 
 * There is also no race condition (although no lock is taken), because
1826
 
 * the record is writen before the row (see here [(5)]).
1827
 
 *
1828
 
 * This means that the row does not point to the record before the
1829
 
 * record has been modified.
1830
 
 *
1831
 
 * Once the record has been modified then the sequential scan will see
1832
 
 * that the record belongs to a new transaction.
1833
 
 *
1834
 
 * If the row pointer was set before the record updated then a race
1835
 
 * condition would exist when the sequential scan reads the record
1836
 
 * after the insert has updated the row pointer but before it has
1837
 
 * changed the record.
1838
 
 *
1839
 
 * AS A RESULT:
1840
 
 *
1841
 
 * I believe I can remove the delayed free record!
1842
 
 *
1843
 
 * This means I can combine the REMOVE and FREE operations.
1844
 
 *
1845
 
 * This is good because this takes care of the problem
1846
 
 * that records are lost when:
1847
 
 *
1848
 
 * The server crashes when the delayed free list still has items on it.
1849
 
 * AND
1850
 
 * The transaction that freed the records has been cleaned, and this
1851
 
 * fact has been committed to the log.
1852
 
 *
1853
 
 * So I have removed the delay here: [(6)]
1854
 
 *
1855
 
 * ------------ 2008-12-03
1856
 
 *
1857
 
 * This code to delay removal of records was finally removed (see above)
1858
 
 */
1859
 
 
1860
 
/*
1861
 
 * As above, but instead a transaction is added to the "to free" queue.
1862
 
 *
1863
 
 * It is important that transactions remain in memory until all
1864
 
 * currently running transactions have ended. This is because
1865
 
 * sequential and index scans have copies of old data.
1866
 
 *
1867
 
 * In the old data a record may not be indicated as cleaned. Such
1868
 
 * a record is considered invalid if the transaction is not in RAM.
1869
 
 *
1870
 
 * GOTCHA:
1871
 
 *
1872
 
 * And this problem is demonstrated by the following example
1873
 
 * which was derived from flush_table.test.
1874
 
 *
1875
 
 * Each handler command below is a separate transaction.
1876
 
 * However the buffer is loaded by 'read first'.
1877
 
 * Depending on when cleanup occurs, records can disappear
1878
 
 * in some of the next commands.
1879
 
 *
1880
 
 * 2 solutions for the test. Use begin ... commit around
1881
 
 * handler open ... close. Or use analyze table t1 before
1882
 
 * open. analyze table waits for the sweeper to complete!
1883
 
 *
1884
 
 * create table dummy(table_id char(20) primary key);
1885
 
 * let $1=100;
1886
 
 * while ($1)
1887
 
 * {
1888
 
 *   drop table if exists t1;
1889
 
 *   create table t1(table_id char(20) primary key);
1890
 
 *   insert into t1 values ('Record-01');
1891
 
 *   insert into t1 values ('Record-02');
1892
 
 *   insert into t1 values ('Record-03');
1893
 
 *   insert into t1 values ('Record-04');
1894
 
 *   insert into t1 values ('Record-05');
1895
 
 *   handler t1 open;
1896
 
 *   handler t1 read first limit 1;
1897
 
 *   handler t1 read next limit 1;
1898
 
 *   handler t1 read next limit 1;
1899
 
 *   handler t1 read next limit 1;
1900
 
 *   handler t1 close;
1901
 
 *   commit;
1902
 
 *   dec $1;
1903
 
 * }
1904
 
 * 
1905
 
 */
1906
 
#ifdef MUST_DELAY_REMOVE
1907
 
static void xn_sw_add_xact_to_free(XTThreadPtr self, XNSweeperStatePtr ss, xtXactID xn_id)
1908
 
{
1909
 
        XNSWToFreeItemRec free_item;
1910
 
 
1911
 
        if ((ss->ss_to_free.bq_front - ss->ss_to_free.bq_back) >= XT_TN_MAX_TO_FREE) {
1912
 
                /* If the queue is full, try to free some items:
1913
 
                 * We use the call count to avoid doing this every time,
1914
 
                 * when the queue overflows!
1915
 
                 */
1916
 
                if ((ss->ss_call_cnt % XT_TN_MAX_TO_FREE_CHECK) == 0)
1917
 
                        /* GOTCHA: This call was not locking the sweeper,
1918
 
                         * this could cause failure, of course:
1919
 
                         */
1920
 
                        xn_sw_service_to_free(self, ss, TRUE);
1921
 
                ss->ss_call_cnt++;
1922
 
        }
1923
 
 
1924
 
        free_item.ri_wait_xn_id = ss->ss_db->db_xn_curr_id;
1925
 
        free_item.ri_tab_id = 0;
1926
 
        free_item.x.ri_xn_id = xn_id;
1927
 
 
1928
 
        xt_bq_add(self, &ss->ss_to_free, &free_item);
1929
 
}
1930
 
#endif
1931
 
 
1932
 
static void xt_sw_delete_variations(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, xtRecordID rec_id, xtRowID row_id, xtXactID xn_id)
1933
 
{
1934
 
        xtRecordID prev_var_rec_id;
1935
 
 
1936
 
        while (rec_id) {
1937
 
                switch (xt_tab_remove_record(ot, rec_id, ss->ss_databuf.db_data, &prev_var_rec_id, FALSE, row_id, xn_id)) {
1938
 
                        case XT_ERR:
1939
 
                                throw_();
1940
 
                                return;
1941
 
                        case TRUE:
1942
 
                                break;
1943
 
                }
1944
 
                rec_id = prev_var_rec_id;
1945
 
        }
1946
 
}
1947
 
 
1948
 
static void xt_sw_delete_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, xtRecordID rec_id, xtBool clean_delete, xtRowID row_id, xtXactID xn_id)
1949
 
{
1950
 
        xtRecordID prev_var_rec_id;
1951
 
 
1952
 
        switch (xt_tab_remove_record(ot, rec_id, ss->ss_databuf.db_data, &prev_var_rec_id, clean_delete, row_id, xn_id)) {
1953
 
                case XT_ERR:
1954
 
                        throw_();
1955
 
                        return;
1956
 
                case TRUE:
1957
 
                        break;
1958
 
                case FALSE:
1959
 
                        break;
1960
 
        }
1961
 
}
1962
 
 
1963
 
/* Set rec_type to this value in order to force cleanup, without
1964
 
 * a check.
1965
 
 */
1966
 
#define XN_FORCE_CLEANUP                XT_TAB_STATUS_FREED
1967
 
 
1968
 
/*
1969
 
 * Read the record to be cleaned. Return TRUE if the cleanup has already been done.
1970
 
 */
1971
 
static xtBool xn_sw_cleanup_done(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtXactID xn_id, u_int rec_type, u_int stat_id, xtRowID row_id, XTTabRecHeadDPtr rec_head)
1972
 
{
1973
 
        if (!xt_tab_get_rec_data(ot, rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) rec_head))
1974
 
                throw_();
1975
 
 
1976
 
        if (rec_type == XN_FORCE_CLEANUP) {
1977
 
                if (XT_REC_IS_FREE(rec_head->tr_rec_type_1))
1978
 
                        return TRUE;
1979
 
        }
1980
 
        else {
1981
 
                /* Transaction must match: */
1982
 
                if (XT_GET_DISK_4(rec_head->tr_xact_id_4) != xn_id)
1983
 
                        return TRUE;
1984
 
 
1985
 
                /* Record header must match expected value from
1986
 
                 * log or clean has been done, or is not required.
1987
 
                 *
1988
 
                 * For example, it is not required if a record
1989
 
                 * has been overwritten in a transaction.
1990
 
                 */
1991
 
                if (rec_head->tr_rec_type_1 != rec_type ||
1992
 
                        rec_head->tr_stat_id_1 != stat_id)
1993
 
                        return TRUE;
1994
 
 
1995
 
                /* Row must match: */
1996
 
                if (XT_GET_DISK_4(rec_head->tr_row_id_4) != row_id)
1997
 
                        return TRUE;
1998
 
        }
1999
 
 
2000
 
        return FALSE;
2001
 
}
2002
 
 
2003
 
static void xn_sw_clean_indices(XTThreadPtr XT_NDEBUG_UNUSED(self), XTOpenTablePtr ot, xtRecordID rec_id, xtRowID row_id, xtWord1 *rec_data, xtWord1 *rec_buffer)
2004
 
{
2005
 
        XTTableHPtr     tab = ot->ot_table;
2006
 
        u_int           cols_req;
2007
 
        XTIndexPtr      *ind;
2008
 
 
2009
 
        if (!tab->tab_dic.dic_key_count)
2010
 
                return;
2011
 
 
2012
 
        cols_req = tab->tab_dic.dic_ind_cols_req;
2013
 
        if (XT_REC_IS_FIXED(rec_data[0]))
2014
 
                rec_buffer = rec_data + XT_REC_FIX_HEADER_SIZE;
2015
 
        else {
2016
 
                if (XT_REC_IS_VARIABLE(rec_data[0])) {
2017
 
                        if (!myxt_load_row(ot, rec_data + XT_REC_FIX_HEADER_SIZE, rec_buffer, cols_req))
2018
 
                                goto failed;
2019
 
                }
2020
 
                else if (XT_REC_IS_EXT_DLOG(rec_data[0])) {
2021
 
                        ASSERT(cols_req);
2022
 
                        if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
2023
 
                                if (!myxt_load_row(ot, rec_data + XT_REC_EXT_HEADER_SIZE, rec_buffer, cols_req))
2024
 
                                        goto failed;
2025
 
                        }
2026
 
                        else {
2027
 
                                if (rec_data != ot->ot_row_rbuffer)
2028
 
                                        memcpy(ot->ot_row_rbuffer, rec_data, tab->tab_dic.dic_rec_size);
2029
 
                                if (!xt_tab_load_ext_data(ot, rec_id, rec_buffer, cols_req))
2030
 
                                        goto failed;
2031
 
                        }
2032
 
                }
2033
 
                else
2034
 
                        /* This is possible, the record has already been cleaned up. */
2035
 
                        return;
2036
 
        }
2037
 
 
2038
 
        ind = tab->tab_dic.dic_keys;
2039
 
        for (u_int i=0; i<tab->tab_dic.dic_key_count; i++, ind++) {
2040
 
                if (!xt_idx_update_row_id(ot, *ind, rec_id, row_id, rec_buffer))
2041
 
                        xt_log_and_clear_exception_ns();
2042
 
        }
2043
 
        return;
2044
 
        
2045
 
        failed:
2046
 
        xt_log_and_clear_exception_ns();
2047
 
}
2048
 
 
2049
 
/*
2050
 
 * Return TRUE if the cleanup was done. FAILED if cleanup could not be done
2051
 
 * because dictionary information is not available.
2052
 
 */
2053
 
static xtBool xn_sw_cleanup_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTXactDataPtr xact, xtTableID tab_id, xtRecordID rec_id, u_int status, u_int rec_type, u_int stat_id, xtRowID row_id, xtWord1 *rec_buf)
2054
 
{
2055
 
        XTOpenTablePtr          ot;
2056
 
        XTTableHPtr                     tab;
2057
 
        XTTabRecHeadDRec        rec_head;
2058
 
        xtRecordID                      after_rec_id;
2059
 
        xtXactID                        xn_id;
2060
 
        xtBool                          skip_cleanup;
2061
 
 
2062
 
        if (!(ot = xn_sw_get_open_table(self, ss, tab_id, &skip_cleanup)))
2063
 
                /* The table no longer exists, consider cleanup done: */
2064
 
                return skip_cleanup;
2065
 
 
2066
 
        tab = ot->ot_table;
2067
 
        ASSERT_NS(ot->ot_thread == self);
2068
 
 
2069
 
        /* Make sure the buffer is large enough! */
2070
 
        xt_db_set_size(self, &ss->ss_databuf, (size_t) tab->tab_dic.dic_mysql_buf_size);
2071
 
 
2072
 
        xn_id = xact->xd_start_xn_id;
2073
 
        if (xact->xd_flags & XT_XN_XAC_COMMITTED) {
2074
 
                /* The transaction has been committed. Clean the record and
2075
 
                 * remove variations no longer in use.
2076
 
                 */
2077
 
                switch (status) {
2078
 
                        case XT_LOG_ENT_REC_MODIFIED:
2079
 
                        case XT_LOG_ENT_UPDATE:
2080
 
                        case XT_LOG_ENT_UPDATE_FL:
2081
 
                        case XT_LOG_ENT_UPDATE_BG:
2082
 
                        case XT_LOG_ENT_UPDATE_FL_BG:
2083
 
                                if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
2084
 
                                        goto done_ok;
2085
 
                                after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
2086
 
                                xt_sw_delete_variations(self, ss, ot, after_rec_id, row_id, xn_id);
2087
 
                                rec_head.tr_rec_type_1 |= XT_TAB_STATUS_CLEANED_BIT;
2088
 
                                XT_SET_NULL_DISK_4(rec_head.tr_prev_rec_id_4);
2089
 
                                if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_CLEANED, 0, rec_id, offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE, (xtWord1 *) &rec_head))
2090
 
                                        throw_();
2091
 
                                xn_sw_clean_indices(self, ot, rec_id, row_id, rec_buf, ss->ss_databuf.db_data);
2092
 
                                break;
2093
 
                        case XT_LOG_ENT_INSERT:
2094
 
                        case XT_LOG_ENT_INSERT_FL:
2095
 
                        case XT_LOG_ENT_INSERT_BG:
2096
 
                        case XT_LOG_ENT_INSERT_FL_BG: {
2097
 
                                /* POTENTIAL BUG 1:
2098
 
                                 *
2099
 
                                 * DROP TABLE IF EXISTS t1;
2100
 
                                 * CREATE TABLE t1 ( id int, name varchar(300)) engine=pbxt;
2101
 
                                 * 
2102
 
                                 * begin;
2103
 
                                 * insert t1(id, name) values(1, "aaa");
2104
 
                                 * update t1 set name=REPEAT('A', 300) where id = 1;
2105
 
                                 * commit;
2106
 
                                 * flush tables;
2107
 
                                 * select * from t1;
2108
 
                                 *
2109
 
                                 * Because the type of record changes, from VARIABLE to
2110
 
                                 * EXTENDED, the cleanup needs to take this into account.
2111
 
                                 *
2112
 
                                 * The input new status value which is written here
2113
 
                                 * depends on the first write to the record.
2114
 
                                 * However, the second write changes the record status.
2115
 
                                 *
2116
 
                                 * Previously we used a OR function to write the bit and
2117
 
                                 * return the byte value of the result.
2118
 
                                 *
2119
 
                                 * The write funtion now checks the record to be written
2120
 
                                 * to make sure it matches the record that needs to be
2121
 
                                 * cleaned. So OR'ing the bit is no longer required.
2122
 
                                 *
2123
 
                                 * POTENTIAL BUG 2:
2124
 
                                 *
2125
 
                                 * We have changed this to fix the following bug:
2126
 
                                 *
2127
 
                                 * T1 starts
2128
 
                                 * T2 starts
2129
 
                                 * T2 insert record 100 in row 50
2130
 
                                 * T2 commits
2131
 
                                 * T1 updates row 50 and adds record 101
2132
 
                                 *
2133
 
                                 * The sweeper does cleanup in order T1, T2, ...
2134
 
                                 *
2135
 
                                 * The sweeper cleans T1 by removing record 100 from the 
2136
 
                                 * row 50 variation list.
2137
 
                                 * This means that record 100 is free.
2138
 
                                 *
2139
 
                                 * The sweeper cleans T2 by marking record 100 as clean.
2140
 
                                 * !BUG! record 100 has already been freed!
2141
 
                                 *
2142
 
                                 * To avoid this we have to check a record before 
2143
 
                                 * cleaning (as we do above for update in xn_sw_cleanup_done())
2144
 
                                 * We check that the record is, in fact, the exact
2145
 
                                 * record that was inserted.
2146
 
                                 *
2147
 
                                 * This is now done be xt_tc_write_cond().
2148
 
                                 */
2149
 
                                xtOpSeqNo op_seq;
2150
 
 
2151
 
                                rec_head.tr_rec_type_1 = rec_type | XT_TAB_STATUS_CLEANED_BIT;
2152
 
                                if(!tab->tab_recs.xt_tc_write_cond(self, ot->ot_rec_file, rec_id, rec_head.tr_rec_type_1, &op_seq, xn_id, row_id, stat_id, rec_type))
2153
 
                                        /* this means record was not updated by xt_tc_write_bor and doesn't need to */
2154
 
                                        break;
2155
 
                                if (!xt_xlog_modify_table(tab->tab_id, XT_LOG_ENT_REC_CLEANED_1, op_seq, 0, 0, rec_id, 1, &rec_head.tr_rec_type_1, self))
2156
 
                                        throw_();
2157
 
                                xn_sw_clean_indices(self, ot, rec_id, row_id, rec_buf, ss->ss_databuf.db_data);
2158
 
                                break;
2159
 
                        }
2160
 
                        case XT_LOG_ENT_DELETE:
2161
 
                        case XT_LOG_ENT_DELETE_FL:
2162
 
                        case XT_LOG_ENT_DELETE_BG:
2163
 
                        case XT_LOG_ENT_DELETE_FL_BG:
2164
 
                                if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
2165
 
                                        goto done_ok;
2166
 
                                after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
2167
 
                                xt_sw_delete_variations(self, ss, ot, after_rec_id, row_id, xn_id);
2168
 
                                xt_sw_delete_variation(self, ss, ot, rec_id, TRUE, row_id, xn_id);
2169
 
                                if (row_id) {
2170
 
                                        if (!xt_tab_free_row(ot, tab, row_id))
2171
 
                                                throw_();
2172
 
                                }
2173
 
                                break;
2174
 
                }
2175
 
        }
2176
 
        else {
2177
 
                /* The transaction has been aborted. Remove the variation from the
2178
 
                 * variation list. If this means the list is empty, then remove
2179
 
                 * the record as well.
2180
 
                 */
2181
 
                xtRecordID                      first_rec_id, next_rec_id, prev_rec_id;
2182
 
                XTTabRecHeadDRec        prev_rec_head;
2183
 
 
2184
 
                if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
2185
 
                        goto done_ok;
2186
 
 
2187
 
                if (!row_id)
2188
 
                        row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
2189
 
                after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
2190
 
                if (!row_id)
2191
 
                        goto unlink_done;
2192
 
 
2193
 
                /* Now remove the record from the variation list,
2194
 
                 * (if it is still on the list).
2195
 
                 */
2196
 
                XT_TAB_ROW_WRITE_LOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
2197
 
 
2198
 
                /* Find the variation before the variation we wish to remove: */
2199
 
                if (!(xt_tab_get_row(ot, row_id, &first_rec_id)))
2200
 
                        goto failed;
2201
 
                prev_rec_id = 0;
2202
 
                next_rec_id = first_rec_id;
2203
 
                while (next_rec_id != rec_id) {
2204
 
                        if (!next_rec_id) {
2205
 
                                /* The record was not found in the list (we are done) */
2206
 
                                XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
2207
 
                                goto unlink_done;
2208
 
                        }
2209
 
                        if (!xt_tab_get_rec_data(ot, next_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &prev_rec_head)) {
2210
 
                                xt_log_and_clear_exception(self);
2211
 
                                break;
2212
 
                        }
2213
 
                        
2214
 
                        prev_rec_id = next_rec_id;
2215
 
                        next_rec_id = XT_GET_DISK_4(prev_rec_head.tr_prev_rec_id_4);
2216
 
                }
2217
 
 
2218
 
                if (next_rec_id == rec_id) {
2219
 
                        /* The record was found on the list: */
2220
 
                        if (prev_rec_id) {
2221
 
                                /* Unlink the deleted variation:
2222
 
                                 * I have found the following sequence:
2223
 
                                 *
2224
 
                                 * 17933 in use  1906112
2225
 
                                 * 1906112 delete      xact=2901   row=17933 prev=2419240
2226
 
                                 * 2419240 delete      xact=2899   row=17933 prev=2153360
2227
 
                                 * 2153360 record-X C  xact=2599   row=17933 prev=0 Xlog=151 Xoff=16824 Xsiz=100
2228
 
                                 *
2229
 
                                 * Despite the following facts which should prevent chains from
2230
 
                                 * forming:
2231
 
                                 *
2232
 
                                 * --- Only one transaction can modify a row
2233
 
                                 * at any one time. So it is not possible for a new change
2234
 
                                 * to be linked onto an uncommitted change.
2235
 
                                 * 
2236
 
                                 * --- Transactions that modify the same row
2237
 
                                 * twice do not allocate a new record for each change.
2238
 
                                 *
2239
 
                                 * -- A change that has been
2240
 
                                 * rolled back will not be linked onto. Instead
2241
 
                                 * the new transaction will link to the last.
2242
 
                                 * Comitted record.
2243
 
                                 *
2244
 
                                 * So if the sweeper is slow in doing its job
2245
 
                                 * we can have the situation that a number of records
2246
 
                                 * can refer to the last committed record of the
2247
 
                                 * row.
2248
 
                                 *
2249
 
                                 * Only one will be reference by the row pointer.
2250
 
                                 *
2251
 
                                 * The other, will all have been rolled back.
2252
 
                                 * This occurs over here: [(4)]
2253
 
                                 */
2254
 
                                XT_SET_DISK_4(prev_rec_head.tr_prev_rec_id_4, after_rec_id);
2255
 
                                if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_UNLINKED, 0, prev_rec_id, offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE, (xtWord1 *) &prev_rec_head))
2256
 
                                        goto failed;
2257
 
                        }
2258
 
                        else {
2259
 
                                /* Variation to be removed at the front of the list. */
2260
 
                                ASSERT(rec_id == first_rec_id);
2261
 
                                if (after_rec_id) {
2262
 
                                        /* Unlink the deleted variation, from the front of the list: */
2263
 
                                        if (!xt_tab_set_row(ot, XT_LOG_ENT_ROW_SET, row_id, after_rec_id))
2264
 
                                                goto failed;
2265
 
                                }
2266
 
                                else {
2267
 
                                        /* No more variations, remove the row: */
2268
 
                                        if (!xt_tab_free_row(ot, tab, row_id))
2269
 
                                                goto failed;
2270
 
                                }
2271
 
                        }
2272
 
                }
2273
 
 
2274
 
                XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
2275
 
 
2276
 
                /* Note: even when not found on the row list, the record must still
2277
 
                 * be freed.
2278
 
                 *
2279
 
                 * There might be an exception to this, but there are very definite
2280
 
                 * cases where this is required, for example when an unreferenced
2281
 
                 * record is found and added to the clean up list xn_add_cu_record().
2282
 
                 */
2283
 
 
2284
 
                unlink_done:
2285
 
                /* Delete the extended record and index entries:
2286
 
                 *
2287
 
                 * NOTE! This must be done after we have release the row lock. Because
2288
 
                 * a thread that does a duplicate check locks the index, and then
2289
 
                 * check whether a row is valid, and can deadlock with
2290
 
                 * code that locks a row, then an index!
2291
 
                 *
2292
 
                 * However, this should all be OK, because the variation has been removed from the
2293
 
                 * row variation list at this stage, and now just need to be deleted.
2294
 
                 */
2295
 
                xt_sw_delete_variation(self, ss, ot, rec_id, FALSE, row_id, xn_id);
2296
 
        }
2297
 
 
2298
 
        done_ok:
2299
 
        return OK;
2300
 
 
2301
 
        failed:
2302
 
        XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
2303
 
        throw_();
2304
 
        return FAILED;
2305
 
}
2306
 
 
2307
 
/* Go through all updated records of a transaction and cleanup.
2308
 
 * This means, of the transaction was aborted, then all the variations written
2309
 
 * by the transaction must be removed.
2310
 
 * If the transaction was committed then we remove older variations.
2311
 
 * If a delete was committed this can lead to the row being removed.
2312
 
 *
2313
 
 * After a transaction has been cleaned it can be removed from RAM.
2314
 
 * If this was the last transaction in a log, and the log has reached
2315
 
 * threshold, and the log is no longer in exclusive use, then the log
2316
 
 * can be deleted.
2317
 
 *
2318
 
 * This function returns OK if the transaction was cleaned up, FALSE
2319
 
 * if a retry is required. Othersize an error is thrown.
2320
 
 */
2321
 
static xtBool xn_sw_cleanup_xact(XTThreadPtr self, XNSweeperStatePtr ss, XTXactDataPtr xact)
2322
 
{
2323
 
        XTDatabaseHPtr          db = ss->ss_db;
2324
 
        XTXactLogBufferDPtr     record;
2325
 
        xtTableID                       tab_id;
2326
 
        xtRecordID                      rec_id;
2327
 
        xtXactID                        xn_id;
2328
 
        xtRowID                         row_id;
2329
 
 
2330
 
        if (!db->db_xlog.xlog_seq_start(&ss->ss_seqread, xact->xd_begin_log, xact->xd_begin_offset, FALSE))
2331
 
                xt_throw(self);
2332
 
 
2333
 
        for (;;) {
2334
 
                if (self->t_quit)
2335
 
                        return FAILED;
2336
 
 
2337
 
                xn_sw_could_go_faster(self, db);
2338
 
 
2339
 
                if (!db->db_xlog.xlog_seq_next(&ss->ss_seqread, &record, FALSE, self))
2340
 
                        xt_throw(self);
2341
 
                if (!record) {
2342
 
                        /* Recovered transactions are considered cleaned when we
2343
 
                         * reach the end of the transaction log.
2344
 
                         * This is required, because transactions that do
2345
 
                         * not have a commit (or rollback) record, because they were
2346
 
                         * running when the server last went down, will otherwise not
2347
 
                         * have the cleanup completed!!
2348
 
                         */
2349
 
                        ASSERT(xact->xd_flags & XT_XN_XAC_RECOVERED);
2350
 
                        if (!(xact->xd_flags & XT_XN_XAC_RECOVERED))
2351
 
                                return FAILED;
2352
 
                        goto cleanup_done;
2353
 
                }
2354
 
                switch (record->xh.xh_status_1) {
2355
 
                        case XT_LOG_ENT_NEW_LOG:
2356
 
                                if (!db->db_xlog.xlog_seq_start(&ss->ss_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, FALSE))
2357
 
                                        xt_throw(self);
2358
 
                                break;
2359
 
                        case XT_LOG_ENT_COMMIT:
2360
 
                        case XT_LOG_ENT_ABORT:
2361
 
                                xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
2362
 
                                if (xn_id == xact->xd_start_xn_id)
2363
 
                                        goto cleanup_done;
2364
 
                                break;
2365
 
                        case XT_LOG_ENT_REC_MODIFIED:
2366
 
                        case XT_LOG_ENT_UPDATE:
2367
 
                        case XT_LOG_ENT_INSERT:
2368
 
                        case XT_LOG_ENT_DELETE:
2369
 
                        case XT_LOG_ENT_UPDATE_BG:
2370
 
                        case XT_LOG_ENT_INSERT_BG:
2371
 
                        case XT_LOG_ENT_DELETE_BG:
2372
 
                                xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
2373
 
                                if (xn_id != xact->xd_start_xn_id)
2374
 
                                        break;
2375
 
                                tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
2376
 
                                rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
2377
 
                                row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
2378
 
                                if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_id, record->xu.xu_status_1, record->xu.xu_rec_type_1, record->xu.xu_stat_id_1, row_id, &record->xu.xu_rec_type_1))
2379
 
                                        return FAILED;
2380
 
                                break;
2381
 
                        case XT_LOG_ENT_UPDATE_FL:
2382
 
                        case XT_LOG_ENT_INSERT_FL:
2383
 
                        case XT_LOG_ENT_DELETE_FL:
2384
 
                        case XT_LOG_ENT_UPDATE_FL_BG:
2385
 
                        case XT_LOG_ENT_INSERT_FL_BG:
2386
 
                        case XT_LOG_ENT_DELETE_FL_BG:
2387
 
                                xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
2388
 
                                if (xn_id != xact->xd_start_xn_id)
2389
 
                                        break;
2390
 
                                tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
2391
 
                                rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
2392
 
                                row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
2393
 
                                if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_id, record->xf.xf_status_1, record->xf.xf_rec_type_1, record->xf.xf_stat_id_1, row_id, &record->xf.xf_rec_type_1))
2394
 
                                        return FAILED;
2395
 
                                break;
2396
 
                        default:
2397
 
                                break;
2398
 
                }
2399
 
        }
2400
 
 
2401
 
        cleanup_done:
2402
 
        /* Write the log to indicate the transaction has been cleaned: */
2403
 
        XTXactCleanupEntryDRec cu;
2404
 
 
2405
 
        cu.xc_status_1 = XT_LOG_ENT_CLEANUP;
2406
 
        cu.xc_checksum_1 = XT_CHECKSUM_1(XT_CHECKSUM4_XACT(xact->xd_start_xn_id));
2407
 
        XT_SET_DISK_4(cu.xc_xact_id_4, xact->xd_start_xn_id);
2408
 
 
2409
 
        if (!xt_xlog_log_data(self, sizeof(XTXactCleanupEntryDRec), (XTXactLogBufferDPtr) &cu, XT_XLOG_NO_WRITE_NO_FLUSH))
2410
 
                return FAILED;
2411
 
 
2412
 
        ss->ss_flush_pending = TRUE;
2413
 
 
2414
 
        xact->xd_flags |= XT_XN_XAC_CLEANED;
2415
 
#ifndef XT_SWEEPER_SORT_XACTS
2416
 
        ASSERT(db->db_xn_to_clean_id == xact->xd_start_xn_id);
2417
 
#endif
2418
 
#ifdef MUST_DELAY_REMOVE
2419
 
        xn_sw_add_xact_to_free(self, ss, xact->xd_start_xn_id);
2420
 
#else
2421
 
        xn_id = xact->xd_start_xn_id;
2422
 
        if (xt_xn_delete_xact(db, xn_id, self)) {
2423
 
                /* Recalculate the minimum memory transaction: */
2424
 
                ASSERT(!xt_xn_is_before(xn_id, db->db_xn_min_ram_id));
2425
 
                
2426
 
                if (db->db_xn_min_ram_id == xn_id) {
2427
 
                        db->db_xn_min_ram_id = xn_id+1;
2428
 
                }
2429
 
                else {
2430
 
                        xtXactID xn_curr_xn_id = xt_xn_get_curr_id(db);
2431
 
 
2432
 
                        while (!xt_xn_is_before(xn_curr_xn_id, db->db_xn_min_ram_id)) { // was db->db_xn_min_ram_id <= xn_curr_xn_id
2433
 
                                /* db_xn_min_ram_id may be changed, by some other process! */
2434
 
                                xn_id = db->db_xn_min_ram_id;
2435
 
                                if (xn_get_xact_details(db, xn_id, self, NULL, NULL, NULL, NULL))
2436
 
                                        break;
2437
 
                                db->db_xn_min_ram_id = xn_id+1;
2438
 
                        }
2439
 
                }
2440
 
        }
2441
 
#endif
2442
 
 
2443
 
        return OK;
2444
 
}
2445
 
 
2446
 
static void xn_free_sw_state(XTThreadPtr self, XNSweeperStatePtr ss)
2447
 
{
2448
 
        xn_sw_close_open_table(self, ss);
2449
 
        if (ss->ss_db)
2450
 
                ss->ss_db->db_xlog.xlog_seq_exit(&ss->ss_seqread);
2451
 
        xt_db_set_size(self, &ss->ss_databuf, 0);
2452
 
        xt_bq_set_size(self, &ss->ss_to_free, 0);
2453
 
}
2454
 
 
2455
 
#ifdef XT_SWEEPER_SORT_XACTS
2456
 
static int xn_compare_xact(const void *a, const void *b)
2457
 
{
2458
 
        register XTXactDataPtr b_a = *((XTXactDataPtr *) a);
2459
 
        register XTXactDataPtr b_b = *((XTXactDataPtr *) b);
2460
 
 
2461
 
        if (b_a->xd_end_xn_id == b_b->xd_end_xn_id) {
2462
 
                if (b_a->xd_start_xn_id < b_b->xd_start_xn_id)
2463
 
                        return -1;
2464
 
                return 1;
2465
 
        }
2466
 
        if (b_a->xd_end_xn_id < b_b->xd_end_xn_id)
2467
 
                return -1;
2468
 
        return 1;
2469
 
}
2470
 
#endif
2471
 
 
2472
 
static void xn_sw_main(XTThreadPtr self)
2473
 
{
2474
 
        XTDatabaseHPtr          db = self->st_database;
2475
 
        XNSweeperStatePtr       ss;
2476
 
        XTXactDataPtr           xact;
2477
 
        time_t                          idle_start = 0;
2478
 
        xtXactID                        curr_id;
2479
 
#ifdef XT_SWEEPER_SORT_XACTS
2480
 
        u_int                           i;
2481
 
        xtXactID                        next_clean_id;
2482
 
#else
2483
 
        XTXactDataPtr           xact2;
2484
 
#endif
2485
 
 
2486
 
        xt_set_priority(self, xt_db_sweeper_priority);
2487
 
 
2488
 
        alloczr_(ss, xn_free_sw_state, sizeof(XNSweeperStateRec), XNSweeperStatePtr);
2489
 
        ss->ss_db = db;
2490
 
 
2491
 
        if (!db->db_xlog.xlog_seq_init(&ss->ss_seqread, xt_db_log_buffer_size, FALSE))
2492
 
                xt_throw(self);
2493
 
 
2494
 
        ss->ss_to_free.bq_item_size = sizeof(XNSWToFreeItemRec);
2495
 
        ss->ss_to_free.bq_max_waste = XT_TN_MAX_TO_FREE_WASTE;
2496
 
        ss->ss_to_free.bq_item_inc = XT_TN_MAX_TO_FREE_INC;
2497
 
        ss->ss_call_cnt = 0;
2498
 
        ss->ss_flush_pending = FALSE;
2499
 
 
2500
 
        while (!self->t_quit) {
2501
 
                while (!self->t_quit) {
2502
 
                        curr_id = xt_xn_get_curr_id(db);
2503
 
 
2504
 
#ifdef XT_SWEEPER_SORT_XACTS
2505
 
                        /* Add transactions to the list if required: */
2506
 
                        while (db->db_sw_list_size < XT_SW_XACT_SORT_LIST_SIZE &&
2507
 
                                !xt_xn_is_before(curr_id, db->db_sw_to_add)) {
2508
 
                                if ((xact = xt_xn_get_xact(db, db->db_sw_to_add, self))) {
2509
 
                                        /* Only add transactions that have completed: */
2510
 
                                        if (!(xact->xd_flags & XT_XN_XAC_SWEEP))
2511
 
                                                break;
2512
 
 
2513
 
                                        /* Add only transactions that did an update to the list: */
2514
 
                                        if ((xact->xd_flags & XT_XN_XAC_LOGGED)) {
2515
 
                                                db->db_sw_xact_list[db->db_sw_list_size] = xact;
2516
 
                                                db->db_sw_list_size++;
2517
 
                                        }
2518
 
                                        else {
2519
 
                                                /* Should not be required (done by the transction itself)! */
2520
 
                                                if (xt_xn_delete_xact(db, db->db_sw_to_add, self)) {
2521
 
                                                        if (db->db_xn_min_ram_id == db->db_sw_to_add)
2522
 
                                                                db->db_xn_min_ram_id = db->db_sw_to_add+1;
2523
 
                                                }
2524
 
                                        }
2525
 
                                }
2526
 
                                db->db_sw_to_add++;
2527
 
                                /* If there are no transactions to be cleaned, then the
2528
 
                                 * next to clean will be at least the next one to check.
2529
 
                                 */
2530
 
                                if (!db->db_sw_list_size)
2531
 
                                        db->db_xn_to_clean_id = db->db_sw_to_add;
2532
 
                        }
2533
 
 
2534
 
                        if (!db->db_sw_list_size) {
2535
 
                                /* Nothing to do: */
2536
 
                                db->db_sw_faster &= ~XT_SW_TOO_FAR_BEHIND;
2537
 
                                goto sleep;
2538
 
                        }
2539
 
 
2540
 
                        if (!db->db_sw_list_size == XT_SW_XACT_SORT_LIST_SIZE)
2541
 
                                db->db_sw_faster |= XT_SW_TOO_FAR_BEHIND;
2542
 
                        xn_sw_could_go_faster(self, db);
2543
 
                        idle_start = 0;
2544
 
 
2545
 
                        /* Sort the transactions, according to there end order: */
2546
 
                        qsort(db->db_sw_xact_list, db->db_sw_list_size, sizeof(XTXactDataPtr), xn_compare_xact);
2547
 
 
2548
 
                        for (i=0; i<db->db_sw_list_size; i++) {
2549
 
                                xact = db->db_sw_xact_list[i];
2550
 
                                if (!xt_xn_is_before(xact->xd_end_xn_id, db->db_sw_to_add))  // xact->xd_end_xn_id >= db->db_sw_to_add
2551
 
                                        break;
2552
 
 
2553
 
                                if (!xn_sw_cleanup_xact(self, ss, xact)) {
2554
 
                                        /* We failed to clean (try again later)... */
2555
 
#ifdef TRACE_SWEEPER_ACTIVITY
2556
 
                                        printf("SWEEPER: cleanup retry...\n", (int) xact->xd_start_xn_id);
2557
 
#endif
2558
 
                                        goto sleep;
2559
 
                                }
2560
 
                        }
2561
 
 
2562
 
                        if (i == db->db_sw_list_size) {
2563
 
                                /* All cleaned out: */
2564
 
                                db->db_xn_to_clean_id = db->db_sw_to_add;
2565
 
                                db->db_sw_list_size = 0;
2566
 
                        }
2567
 
                        else {
2568
 
                                u_int           j;
2569
 
 
2570
 
                                /* The next to clean will be the smallest still in the
2571
 
                                 * list.
2572
 
                                 *
2573
 
                                 * NOTE: db_xn_to_clean_id means that all transactions
2574
 
                                 * before this are clean.
2575
 
                                 *
2576
 
                                 * It may be that some after this point have also
2577
 
                                 * been cleaned!!
2578
 
                                 */
2579
 
                                next_clean_id = db->db_sw_xact_list[i]->xd_start_xn_id;
2580
 
                                for (j=i+1; j<db->db_sw_list_size; j++) {
2581
 
                                        if (xt_xn_is_before(db->db_sw_xact_list[j]->xd_start_xn_id, next_clean_id))
2582
 
                                                next_clean_id = db->db_sw_xact_list[j]->xd_start_xn_id;
2583
 
                                }
2584
 
                                db->db_xn_to_clean_id = next_clean_id;
2585
 
 
2586
 
                                if (i == 0) {
2587
 
                                        /* Something to do, but nothing done! */
2588
 
                                        if ((xact = xt_xn_get_xact(db, db->db_sw_to_add, self))) {
2589
 
                                                /* Before we go to sleep, lets just check again: */ 
2590
 
                                                if (!(xact->xd_flags & XT_XN_XAC_SWEEP)) {
2591
 
                                                        db->db_stat_sweep_waits++;
2592
 
                                                        goto sleep;
2593
 
                                                }
2594
 
                                        }
2595
 
                                }
2596
 
 
2597
 
                                memmove(db->db_sw_xact_list, &db->db_sw_xact_list[i], (db->db_sw_list_size - i) * sizeof(XTXactDataPtr));
2598
 
                                db->db_sw_list_size -= i;
2599
 
                        }
2600
 
#else
2601
 
                        /* We are just about to check the condition for sleeping,
2602
 
                         * so if the condition for sleeping holds, then we will
2603
 
                         * exit the loop and sleep.
2604
 
                         *
2605
 
                         * We will then sleep if nobody sets the flag before we
2606
 
                         * actually do sleep!
2607
 
                         */
2608
 
                        if (xt_xn_is_before(curr_id, db->db_xn_to_clean_id)) {
2609
 
                                db->db_sw_faster &= ~XT_SW_TOO_FAR_BEHIND;
2610
 
                                break;
2611
 
                        }
2612
 
 
2613
 
                        /* {TUNING} How far to we allow the sweeper to get behind?
2614
 
                         * The higher this is, the higher burst performance can
2615
 
                         * be. But too high and the sweeper falls out of reading the
2616
 
                         * transaction log cache, and also starts to spread
2617
 
                         * changes around in index and data blocks that are no
2618
 
                         * longer hot.
2619
 
                         */
2620
 
                        if (curr_id - db->db_xn_to_clean_id > 250)
2621
 
                                db->db_sw_faster |= XT_SW_TOO_FAR_BEHIND;
2622
 
                        else
2623
 
                                db->db_sw_faster &= ~XT_SW_TOO_FAR_BEHIND;
2624
 
                        xn_sw_could_go_faster(self, db);
2625
 
                        idle_start = 0;
2626
 
 
2627
 
                        if ((xact = xt_xn_get_xact(db, db->db_xn_to_clean_id, self))) {
2628
 
                                xtXactID xn_id;
2629
 
 
2630
 
                                /* The sweep flag is set when the transaction is ready for sweeping.
2631
 
                                 * Prepared transactions may not be swept!
2632
 
                                 */
2633
 
                                if (!(xact->xd_flags & XT_XN_XAC_SWEEP) || (xact->xd_flags & XT_XN_XAC_PREPARED))
2634
 
                                        goto sleep;
2635
 
 
2636
 
                                /* Check if we can cleanup the transaction.
2637
 
                                 * We do this by checking to see if there is any running
2638
 
                                 * transaction which start before the end of this transaction.
2639
 
                                 */
2640
 
                                xn_id = xact->xd_start_xn_id;
2641
 
                                while (xt_xn_is_before(xn_id, xact->xd_end_xn_id)) {
2642
 
                                        xn_id++;
2643
 
                                        if ((xact2 = xt_xn_get_xact(db, xn_id, self))) {
2644
 
                                                if (!(xact2->xd_flags & XT_XN_XAC_ENDED)) {
2645
 
                                                        /* A transaction was started before the end of
2646
 
                                                         * the transaction we wish to sweep, and this
2647
 
                                                         * transaction has not committed, the we have to
2648
 
                                                         * wait.
2649
 
                                                         */
2650
 
                                                        db->db_stat_sweep_waits++;
2651
 
                                                        goto sleep;
2652
 
                                                }
2653
 
                                        }
2654
 
                                }
2655
 
                                
2656
 
                                /* Can cleanup the transaction, and move to the next. */
2657
 
                                if (xact->xd_flags & XT_XN_XAC_LOGGED) {
2658
 
#ifdef TRACE_SWEEPER_ACTIVITY
2659
 
                                        printf("SWEEPER: cleanup %d\n", (int) xact->xd_start_xn_id);
2660
 
#endif
2661
 
                                        if (!xn_sw_cleanup_xact(self, ss, xact)) {
2662
 
                                                /* We failed to clean (try again later)... */
2663
 
#ifdef TRACE_SWEEPER_ACTIVITY
2664
 
                                                printf("SWEEPER: cleanup retry...\n", (int) xact->xd_start_xn_id);
2665
 
#endif
2666
 
                                                goto sleep;
2667
 
                                        }
2668
 
#ifdef TRACE_SWEEPER_ACTIVITY
2669
 
                                        printf("SWEEPER: cleanup DONE\n", (int) xact->xd_start_xn_id);
2670
 
#endif
2671
 
                                }
2672
 
                                else {
2673
 
                                        /* This was a read-only transaction, it is safe to
2674
 
                                         * just remove the transaction structure from memory.
2675
 
                                         * (should not be necessary because RO transactions
2676
 
                                         * do this themselves):
2677
 
                                         */
2678
 
                                        if (xt_xn_delete_xact(db, db->db_xn_to_clean_id, self)) {
2679
 
                                                if (db->db_xn_min_ram_id == db->db_xn_to_clean_id)
2680
 
                                                        db->db_xn_min_ram_id = db->db_xn_to_clean_id+1;
2681
 
                                        }
2682
 
                                }
2683
 
                        }
2684
 
                        
2685
 
                        /* Move on to clean the next: */
2686
 
                        db->db_xn_to_clean_id++;
2687
 
#endif
2688
 
                }
2689
 
 
2690
 
                sleep:                  
2691
 
 
2692
 
                xn_sw_close_open_table(self, ss);
2693
 
 
2694
 
                xn_sw_go_slower(self, db);
2695
 
 
2696
 
                /* Shrink the free list, if it is empty, and larger then
2697
 
                 * the default:
2698
 
                 */
2699
 
                if (ss->ss_to_free.bq_size > XT_TN_MAX_TO_FREE) {
2700
 
                        if (ss->ss_to_free.bq_front == 0 && ss->ss_to_free.bq_back == 0)
2701
 
                                xt_bq_set_size(self, &ss->ss_to_free, XT_TN_MAX_TO_FREE);
2702
 
                }
2703
 
 
2704
 
                /* Windows: close the log file that we have open for reading, if we
2705
 
                 * read past the end of the log on the last transaction.
2706
 
                 * This makes sure that the log is closed when the checkpointer
2707
 
                 * tries to remove or rename it!!
2708
 
                 */
2709
 
                if (ss->ss_seqread.xseq_log_file) {
2710
 
                        if (ss->ss_seqread.xseq_rec_log_id != ss->ss_seqread.xseq_log_id)
2711
 
                                db->db_xlog.xlog_seq_close(&ss->ss_seqread);
2712
 
                }
2713
 
 
2714
 
                if (ss->ss_flush_pending) {
2715
 
                        /* Flush pending means we have written something to the log.
2716
 
                         *
2717
 
                         * if so we flush the log so that the writer will also do
2718
 
                         * its work!
2719
 
                         *
2720
 
                         * This will lead to the freeer continuing if it is waiting.
2721
 
                         */
2722
 
 
2723
 
                        time_t now = time(NULL);
2724
 
                        if (idle_start) {
2725
 
                                /* By default, we wait for 2 seconds idle time, then
2726
 
                                 * we flush the log.
2727
 
                                 */
2728
 
                                if (now >= idle_start + 2) {
2729
 
                                        /* Don't do this if flusher is active! */
2730
 
                                        if (!db->db_fl_thread &&
2731
 
                                                !xt_xlog_flush_log(db, self))
2732
 
                                                xt_throw(self);
2733
 
                                        ss->ss_flush_pending = FALSE;
2734
 
                                }
2735
 
                        }
2736
 
                        else
2737
 
                                idle_start = now;
2738
 
                }
2739
 
 
2740
 
                /* {WAKE-SW} Waking up the sweeper is very expensive!
2741
 
                 * Cost is 3% of execution time on the test:
2742
 
                 * runTest(SMALL_SELECT_TEST, 2, 100000)
2743
 
                 *
2744
 
                 * On the other hand, polling every 1/10 second
2745
 
                 * is cheap, because the check for transactions
2746
 
                 * ready for cleanup is very quick.
2747
 
                 *
2748
 
                 * So this is the prefered method.
2749
 
                 */
2750
 
                xn_sw_wait_for_xact(self, db, 10);
2751
 
        }
2752
 
 
2753
 
        if (ss->ss_flush_pending) {
2754
 
                xt_xlog_flush_log(db, self);
2755
 
                ss->ss_flush_pending = FALSE;
2756
 
        }
2757
 
 
2758
 
        freer_(); // xn_free_sw_state(ss)
2759
 
}
2760
 
 
2761
 
static void *xn_sw_run_thread(XTThreadPtr self)
2762
 
{
2763
 
        XTDatabaseHPtr  db = (XTDatabaseHPtr) self->t_data;
2764
 
        int                             count;
2765
 
        void                    *mysql_thread;
2766
 
 
2767
 
        if (!(mysql_thread = myxt_create_thread()))
2768
 
                xt_throw(self);
2769
 
 
2770
 
        while (!self->t_quit) {
2771
 
                try_(a) {
2772
 
                        /*
2773
 
                         * The garbage collector requires that the database
2774
 
                         * is in use because.
2775
 
                         */
2776
 
                        xt_use_database(self, db, XT_FOR_SWEEPER);
2777
 
 
2778
 
                        /* {BACKGROUND-RELEASE-DB}
2779
 
                         * This action is both safe and required:
2780
 
                         *
2781
 
                         * safe: releasing the database is safe because as
2782
 
                         * long as this thread is running the database
2783
 
                         * reference is valid, and this reference cannot
2784
 
                         * be the only one to the database because
2785
 
                         * otherwize this thread would not be running.
2786
 
                         *
2787
 
                         * required: releasing the database is necessary
2788
 
                         * otherwise we cannot close the database
2789
 
                         * correctly because we only shutdown this
2790
 
                         * thread when the database is closed and we
2791
 
                         * only close the database when all references
2792
 
                         * are removed.
2793
 
                         */
2794
 
                        xt_heap_release(self, self->st_database);
2795
 
 
2796
 
                        xn_sw_main(self);
2797
 
                }
2798
 
                catch_(a) {
2799
 
                        /* This error is "normal"! */
2800
 
                        if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
2801
 
                                !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
2802
 
                                self->t_exception.e_sys_err == SIGTERM))
2803
 
                                xt_log_and_clear_exception(self);
2804
 
                }
2805
 
                cont_(a);
2806
 
 
2807
 
                /* Avoid releasing the database (done above) */
2808
 
                self->st_database = NULL;
2809
 
                xt_unuse_database(self, self);
2810
 
 
2811
 
                /* After an exception, pause before trying again... */
2812
 
                /* Number of seconds */
2813
 
#ifdef DEBUG
2814
 
                count = 10;
2815
 
#else
2816
 
                count = 2*60;
2817
 
#endif
2818
 
                db->db_sw_idle = XT_THREAD_INERR;
2819
 
                while (!self->t_quit && count > 0) {
2820
 
                        sleep(1);
2821
 
                        count--;
2822
 
                }
2823
 
                db->db_sw_idle = XT_THREAD_BUSY;
2824
 
        }
2825
 
 
2826
 
   /*
2827
 
        * {MYSQL-THREAD-KILL}
2828
 
        myxt_destroy_thread(mysql_thread, TRUE);
2829
 
        */
2830
 
        return NULL;
2831
 
}
2832
 
 
2833
 
static void xn_sw_free_thread(XTThreadPtr self, void *data)
2834
 
{
2835
 
        XTDatabaseHPtr db = (XTDatabaseHPtr) data;
2836
 
 
2837
 
        if (db->db_sw_thread) {
2838
 
                xt_lock_mutex(self, &db->db_sw_lock);
2839
 
                pushr_(xt_unlock_mutex, &db->db_sw_lock);
2840
 
                db->db_sw_thread = NULL;
2841
 
                freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2842
 
        }
2843
 
}
2844
 
 
2845
 
/* Wait for a transaction to quit: */
2846
 
static void xn_sw_wait_for_xact(XTThreadPtr self, XTDatabaseHPtr db, u_int hsecs)
2847
 
{
2848
 
        xt_lock_mutex(self, &db->db_sw_lock);
2849
 
        pushr_(xt_unlock_mutex, &db->db_sw_lock);
2850
 
        db->db_sw_idle = XT_THREAD_IDLE;
2851
 
        if (!self->t_quit && !db->db_sw_faster)
2852
 
                xt_timed_wait_cond(self, &db->db_sw_cond, &db->db_sw_lock, hsecs * 10);
2853
 
        db->db_sw_idle = XT_THREAD_BUSY;
2854
 
        db->db_sw_check_count++;
2855
 
        freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2856
 
}
2857
 
 
2858
 
xtPublic void xt_start_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
2859
 
{
2860
 
        char name[PATH_MAX];
2861
 
 
2862
 
        sprintf(name, "SW-%s", xt_last_directory_of_path(db->db_main_path));
2863
 
        xt_remove_dir_char(name);
2864
 
        db->db_sw_thread = xt_create_daemon(self, name);
2865
 
        xt_set_thread_data(db->db_sw_thread, db, xn_sw_free_thread);
2866
 
        xt_run_thread(self, db->db_sw_thread, xn_sw_run_thread);
2867
 
}
2868
 
 
2869
 
xtPublic void xt_init_sweeper_wait(XTThreadPtr self, XTDatabaseHPtr db)
2870
 
{
2871
 
        xtXactID        current_id;
2872
 
 
2873
 
        current_id = db->db_xn_curr_id;
2874
 
        if (!xt_xn_is_before(current_id, db->db_xn_to_clean_id)) {
2875
 
                double          init_diff, curr_done = 0;
2876
 
                time_t          start_time;
2877
 
                xtBool          print_progress = FALSE;
2878
 
                int                     perc_to_print = 1;
2879
 
 
2880
 
                init_diff = (double) xt_xn_diff(current_id, db->db_xn_to_clean_id);
2881
 
                start_time = time(NULL);
2882
 
 
2883
 
                xt_logf(XT_NT_INFO, "PBXT: Initial sweep, transactions to scan: %llu\n", (u_llong) init_diff);
2884
 
                
2885
 
                while (!xt_xn_is_before(current_id, db->db_xn_to_clean_id)) { // means: db->db_xn_to_clean_id <= current_id
2886
 
                        xt_lock_mutex(self, &db->db_sw_lock);
2887
 
                        pushr_(xt_unlock_mutex, &db->db_sw_lock);
2888
 
                        xt_wakeup_sweeper(db);
2889
 
                        freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2890
 
 
2891
 
                        xt_sleep_milli_second(10);
2892
 
 
2893
 
                        if (!print_progress) {
2894
 
                                if (time(NULL) - start_time > 2)
2895
 
                                        print_progress = TRUE;
2896
 
                        }
2897
 
 
2898
 
                        if (print_progress) {
2899
 
                                curr_done = init_diff - (double) xt_xn_diff(current_id, db->db_xn_to_clean_id);
2900
 
                                while (perc_to_print <= (int) (curr_done / init_diff * 100)) {
2901
 
                                        if (((perc_to_print - 1) % 25) == 0)
2902
 
                                                xt_logf(XT_NT_INFO, "PBXT: ");
2903
 
                                        if ((perc_to_print % 25) == 0)
2904
 
                                                xt_logf(XT_NT_INFO, "%2d\n", perc_to_print);
2905
 
                                        else
2906
 
                                                xt_logf(XT_NT_INFO, "%2d ", perc_to_print);
2907
 
                                        xt_log_flush(self);
2908
 
                                        perc_to_print++;
2909
 
                                }
2910
 
                        }
2911
 
                }
2912
 
 
2913
 
                if (print_progress) {
2914
 
                        while (perc_to_print <= 100) {
2915
 
                                if (((perc_to_print - 1) % 25) == 0)
2916
 
                                        xt_logf(XT_NT_INFO, "PBXT: ");
2917
 
                                if ((perc_to_print % 25) == 0)
2918
 
                                        xt_logf(XT_NT_INFO, "%2d\n", perc_to_print);
2919
 
                                else
2920
 
                                        xt_logf(XT_NT_INFO, "%2d ", perc_to_print);
2921
 
                                xt_log_flush(self);
2922
 
                                perc_to_print++;
2923
 
                        }
2924
 
                }
2925
 
                xt_logf(XT_NT_INFO, "PBXT: Initial sweep complete, transactions scanned: %llu\n", (u_llong) init_diff);
2926
 
        }
2927
 
}
2928
 
 
2929
 
xtPublic void xt_wait_for_sweeper(XTThreadPtr self, XTDatabaseHPtr db, int abort_time)
2930
 
{
2931
 
        time_t  then, now;
2932
 
        xtBool  message = FALSE;
2933
 
 
2934
 
        if (db->db_sw_thread) {
2935
 
                then = time(NULL);
2936
 
                /* Changed xt_xn_get_curr_id(db) to db->db_xn_curr_id,
2937
 
                 * This should work because we are not concerned about the difference
2938
 
                 * between xt_xn_get_curr_id(db) and db->db_xn_curr_id,
2939
 
                 * Which is just a matter of when transactions we can expect ot find
2940
 
                 * in memory (see {GAP-INC-ADD-XACT})
2941
 
                 */
2942
 
                while (!xt_xn_is_before(db->db_xn_curr_id, db->db_xn_to_clean_id)) { // was db->db_xn_to_clean_id <= xt_xn_get_curr_id(db)
2943
 
                        xt_lock_mutex(self, &db->db_sw_lock);
2944
 
                        pushr_(xt_unlock_mutex, &db->db_sw_lock);
2945
 
                        xt_wakeup_sweeper(db);
2946
 
                        freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2947
 
                        xt_sleep_milli_second(10);
2948
 
                        now = time(NULL);
2949
 
                        if (abort_time && now >= then + abort_time) {
2950
 
                                xt_logf(XT_NT_INFO, "Aborting wait for '%s' sweeper\n", db->db_name);
2951
 
                                message = FALSE;
2952
 
                                break;
2953
 
                        }
2954
 
                        if (now >= then + 2) {
2955
 
                                if (!message) {
2956
 
                                        message = TRUE;
2957
 
                                        xt_logf(XT_NT_INFO, "Waiting for '%s' sweeper...\n", db->db_name);
2958
 
                                }
2959
 
                        }
2960
 
                }
2961
 
 
2962
 
                if (message)
2963
 
                        xt_logf(XT_NT_INFO, "Sweeper '%s' done.\n", db->db_name);
2964
 
        }
2965
 
}
2966
 
 
2967
 
xtPublic void xt_stop_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
2968
 
{
2969
 
        XTThreadPtr thr_sw;
2970
 
 
2971
 
        if (db->db_sw_thread) {
2972
 
                xt_lock_mutex(self, &db->db_sw_lock);
2973
 
                pushr_(xt_unlock_mutex, &db->db_sw_lock);
2974
 
 
2975
 
                /* This pointer is safe as long as you have the transaction lock. */
2976
 
                if ((thr_sw = db->db_sw_thread)) {
2977
 
                        xtThreadID tid = thr_sw->t_id;
2978
 
 
2979
 
                        /* Make sure the thread quits when woken up. */
2980
 
                        xt_terminate_thread(self, thr_sw);
2981
 
 
2982
 
                        xt_wakeup_sweeper(db);
2983
 
        
2984
 
                        freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2985
 
 
2986
 
                        /*
2987
 
                         * GOTCHA: This is a wierd thing but the SIGTERM directed
2988
 
                         * at a particular thread (in this case the sweeper) was
2989
 
                         * being caught by a different thread and killing the server
2990
 
                         * sometimes. Disconcerting.
2991
 
                         * (this may only be a problem on Mac OS X)
2992
 
                        xt_kill_thread(thread);
2993
 
                         */
2994
 
                        xt_wait_for_thread_to_exit(tid, FALSE);
2995
 
        
2996
 
                        /* PMC - This should not be necessary to set the signal here, but in the
2997
 
                         * debugger the handler is not called!!?
2998
 
                        thr_sw->t_delayed_signal = SIGTERM;
2999
 
                        xt_kill_thread(thread);
3000
 
                         */
3001
 
                        db->db_sw_thread = NULL;
3002
 
                }
3003
 
                else
3004
 
                        freer_(); // xt_unlock_mutex(&db->db_sw_lock)
3005
 
        }
3006
 
}
3007
 
 
3008
 
xtPublic void xt_wakeup_sweeper(XTDatabaseHPtr db)
3009
 
{
3010
 
        /* This flag makes the gap for the race condition
3011
 
         * very small.
3012
 
         *
3013
 
         * However, this posibility still remains because
3014
 
         * we do not lock the mutex db_sw_lock here.
3015
 
         *
3016
 
         * The reason is that it is too expensive.
3017
 
         *
3018
 
         * In the event that the wakeup is missed the sleeper
3019
 
         * wait will timeout eventually.
3020
 
         */
3021
 
        if (db->db_sw_idle) {
3022
 
                if (!xt_broadcast_cond_ns(&db->db_sw_cond))
3023
 
                        xt_log_and_clear_exception_ns();
3024
 
        }
3025
 
}