1
/* Copyright (c) 2005 PrimeBase Technologies GmbH
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* 2005-04-10 Paul McCullagh
24
#include "xt_config.h"
33
#include "xaction_xt.h"
34
#include "database_xt.h"
35
#include "strutil_xt.h"
39
#include "tabcache_xt.h"
42
//#define TRACE_WAIT_FOR
43
//#define TRACE_VARIATIONS
44
//#define TRACE_SWEEPER_ACTIVITY
46
/* Enable to trace the statements executed by the engine: */
47
//#define TRACE_STATEMENTS
50
#if defined(TRACE_STATEMENTS) || defined(TRACE_VARIATIONS)
51
#define TRACE_TRANSACTION
54
static void xn_sw_wait_for_xact(XTThreadPtr self, XTDatabaseHPtr db, u_int hsecs);
55
static xtBool xn_get_xact_details(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread), int *flags, xtXactID *start, xtXactID *end, xtThreadID *thd_id);
56
static xtBool xn_get_xact_pointer(XTDatabaseHPtr db, xtXactID xn_id, XTXactDataPtr *xact_ptr);
58
/* ============================================================================================== */
60
typedef struct XNSWRecItem {
63
} XNSWRecItemRec, *XNSWRecItemPtr;
65
typedef struct XNSWToFreeItem {
66
xtTableID ri_tab_id; /* If non-zero, then this is the table of the data record to be freed.
67
* If zero, then this free the transaction below must be freed.
73
xtXactID ri_wait_xn_id; /* Wait for this transaction to be cleaned (or being cleaned up)
74
* before freeing this resource. */
75
} XNSWToFreeItemRec, *XNSWToFreeItemPtr;
77
/* ----------------------------------------------------------------------
78
* WAIT FOR TRANSACTIONS
81
typedef struct XNWaitFor {
82
xtXactID wf_waiting_xn_id; /* The transaction of the waiting thread. */
83
xtXactID wf_for_me_xn_id; /* The transaction we are waiting for. */
84
} XNWaitForRec, *XNWaitForPtr;
86
static int xn_compare_wait_for(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
88
xtXactID *x = (xtXactID *) a;
89
XNWaitForPtr y = (XNWaitForPtr) b;
91
if (*x == y->wf_waiting_xn_id)
93
if (xt_xn_is_before(*x, y->wf_waiting_xn_id))
98
static void xn_free_wait_for(XTThreadPtr XT_UNUSED(self), void *XT_UNUSED(thunk), void *XT_UNUSED(item))
103
* A deadlock occurs when a transaction is waiting for itself!
104
* For example A is waiting for B which is waiting for A.
105
* By repeatedly scanning the wait_for list we can find out if a
106
* transaction is waiting for itself.
108
static xtBool xn_detect_deadlock(XTDatabaseHPtr db, xtXactID waiting, xtXactID for_me)
113
if (waiting == for_me) {
114
#ifdef TRACE_WAIT_FOR
115
for (u_int i=0; i<xt_sl_get_size(db->db_xn_wait_for); i++) {
116
wf = (XNWaitForPtr) xt_sl_item_at(db->db_xn_wait_for, i);
117
xt_trace("T%lu --> T%lu\n", (u_long) wf->wf_waiting_xn_id, (u_long) wf->wf_for_me_xn_id);
119
xt_ttracef(xt_get_self(), "DEADLOCK\n");
122
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_DEADLOCK);
125
if (!(wf = (XNWaitForPtr) xt_sl_find(NULL, db->db_xn_wait_for, &for_me)))
127
for_me = wf->wf_for_me_xn_id;
132
#ifdef XT_USE_SPINLOCK_WAIT_FOR
134
#if defined(XT_MAC) || defined(XT_WIN)
135
#define WAIT_SPIN_COUNT 10
137
#define WAIT_SPIN_COUNT 50
140
/* Should not be required, but we wait for a second,
141
* just in case the wakeup is missed!
144
#define WAIT_FOR_XACT_TIME 30000
146
#define WAIT_FOR_XACT_TIME 1000
149
static xtBool xn_add_to_wait_for(XTDatabaseHPtr db, XNWaitForPtr wf, XTThreadPtr thread)
151
/* If we are waiting for a transaction to end,
152
* put this thread on the wait list...
154
* As long as the temporary lock is removed
155
* or turned into a permanent lock before
156
* a thread waits again, all should be OK!
158
xt_spinlock_lock(&db->db_xn_wait_spinlock);
160
#ifdef TRACE_WAIT_FOR
161
xt_ttracef(thread, "T%lu -wait-> T%lu\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) wait_xn_id);
163
/* Check for a deadlock: */
164
if (xn_detect_deadlock(db, wf->wf_waiting_xn_id, wf->wf_for_me_xn_id))
167
/* We will wait for this transaction... */
168
db->db_xn_wait_count++;
169
if (thread->st_xact_writer)
170
db->db_xn_writer_wait_count++;
172
if (!xt_sl_insert(NULL, db->db_xn_wait_for, &wf->wf_waiting_xn_id, wf)) {
173
db->db_xn_wait_count--;
177
xt_spinlock_unlock(&db->db_xn_wait_spinlock);
181
xt_spinlock_unlock(&db->db_xn_wait_spinlock);
185
inline void xn_remove_from_wait_for(XTDatabaseHPtr db, XNWaitForPtr wf, XTThreadPtr thread)
187
xt_spinlock_lock(&db->db_xn_wait_spinlock);
189
xt_sl_delete(NULL, db->db_xn_wait_for, &wf->wf_waiting_xn_id);
190
db->db_xn_wait_count--;
191
if (thread->st_xact_writer)
192
db->db_xn_writer_wait_count--;
194
#ifdef TRACE_WAIT_FOR
195
xt_ttracef(thread, "T%lu -wait-> T%lu FAILED\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) wait_xn_id);
197
xt_spinlock_unlock(&db->db_xn_wait_spinlock);
200
/* Wait for a transation to terminate or a lock to be granted.
202
* If term_req is TRUE, then the termination of the transaction is required
205
* If pw_func is set then this function will not return before this call has
208
* This function returns FAILE on error.
210
xtPublic xtBool xt_xn_wait_for_xact(XTThreadPtr thread, XTXactWaitPtr xw, XTLockWaitPtr lw)
212
XTDatabaseHPtr db = thread->st_database;
216
XTXactDataPtr wait_xact_ptr;
217
xtBool on_wait_list = FALSE;
218
XTXactWaitRec xw_new;
219
u_int loop_count = 0;
220
XTWaitThreadPtr my_wt;
222
ASSERT_NS(thread->st_xact_data);
223
thread->st_statistics.st_wait_for_xact++;
225
wf.wf_waiting_xn_id = thread->st_xact_data->xd_start_xn_id;
228
/* If we are here, then the lw structure is on the wait
229
* queue for the given lock.
231
xtXactID locking_xn_id;
234
locking_xn_id = lw->lw_xn_id;
235
wf.wf_for_me_xn_id = lw->lw_xn_id;
236
if (!xn_add_to_wait_for(db, &wf, thread)) {
237
lw->lw_ot->ot_table->tab_locks.xt_cancel_temp_lock(lw);
241
while (loop_count < WAIT_SPIN_COUNT) {
244
switch (lw->lw_curr_lock) {
246
xn_remove_from_wait_for(db, &wf, thread);
250
/* Check if we must also wait for the transaction: */
251
if (lw->lw_row_updated) {
252
/* This will override the xw passed in.
253
* The reason is, because we are actually waiting
254
* for a lock, and the lock owner may have changed
255
* while we were waiting for the lock.
257
xw_new.xw_xn_id = lw->lw_updating_xn_id;
261
if (wf.wf_for_me_xn_id == xw->xw_xn_id)
264
xn_remove_from_wait_for(db, &wf, thread);
267
xn_remove_from_wait_for(db, &wf, thread);
271
if (locking_xn_id != lw->lw_xn_id) {
272
/* Change the transaction that we are waiting for: */
273
xn_remove_from_wait_for(db, &wf, thread);
274
goto wait_for_locker;
283
/* The non-spinning version... */
284
wait_for_locker_no_spin:
285
THR_ARRAY_READ_LOCK(&xt_thr_array_resize_lock, thread->t_id);
286
my_wt = xt_thr_array[thread->t_id].td_waiting;
287
THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, thread->t_id);
288
xt_lock_mutex_ns(&my_wt->wt_lock);
291
switch (lw->lw_curr_lock) {
293
xt_unlock_mutex_ns(&my_wt->wt_lock);
294
xn_remove_from_wait_for(db, &wf, thread);
297
xt_unlock_mutex_ns(&my_wt->wt_lock);
298
if (lw->lw_row_updated) {
299
xw_new.xw_xn_id = lw->lw_updating_xn_id;
303
if (wf.wf_for_me_xn_id == xw->xw_xn_id)
306
xn_remove_from_wait_for(db, &wf, thread);
309
xn_remove_from_wait_for(db, &wf, thread);
313
if (locking_xn_id != lw->lw_xn_id) {
314
/* Change the transaction that we are waiting for: */
315
xt_unlock_mutex_ns(&my_wt->wt_lock);
316
xn_remove_from_wait_for(db, &wf, thread);
317
locking_xn_id = lw->lw_xn_id;
318
wf.wf_for_me_xn_id = lw->lw_xn_id;
319
if (!xn_add_to_wait_for(db, &wf, thread)) {
320
lw->lw_ot->ot_table->tab_locks.xt_cancel_temp_lock(lw);
323
goto wait_for_locker_no_spin;
328
xt_timed_wait_cond_ns(&my_wt->wt_cond, &my_wt->wt_lock, WAIT_FOR_XACT_TIME);
332
xt_unlock_mutex_ns(&my_wt->wt_lock);
337
xtThreadID tn_thd_id;
340
wf.wf_for_me_xn_id = xw->xw_xn_id;
342
if (!xn_get_xact_pointer(db, xw->xw_xn_id, &wait_xact_ptr))
343
/* The transaction was not found... */
347
/* This is a dirty read, but it should work! */
348
flags = wait_xact_ptr->xd_flags;
349
start = wait_xact_ptr->xd_start_xn_id;
350
tn_thd_id = wait_xact_ptr->xd_thread_id;
354
if (!xn_get_xact_details(db, xw->xw_xn_id, thread, &flags, &start, NULL, &tn_thd_id))
355
flags = XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
358
if ((flags & XT_XN_XAC_ENDED) || start != xw->xw_xn_id)
359
/* The transaction has terminated! */
362
/* Tell the thread we are waiting for it: */
363
xt_add_to_wakeup_list(thread->t_id, tn_thd_id);
366
if (!xn_add_to_wait_for(db, &wf, thread))
371
/* The spinning version: */
372
while (loop_count < WAIT_SPIN_COUNT) {
378
/* This is a dirty read, but it should work! */
379
flags = wait_xact_ptr->xd_flags;
380
start = wait_xact_ptr->xd_start_xn_id;
383
if (!xn_get_xact_details(db, xw->xw_xn_id, thread, &flags, &start, NULL, NULL))
384
flags = XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
387
if ((flags & XT_XN_XAC_ENDED) || start != xw->xw_xn_id)
388
/* The transaction has terminated! */
392
/* The non-spinning version:
394
* I believe I can avoid missing the wakeup signal
395
* by locking before we check if the transaction
398
* Even though db->db_xn_wait_on_cond is "dirty read".
400
* The reason is, before the signal is sent the
401
* lock is also aquired. This is not possible until
402
* this thread is safely sleaping.
404
THR_ARRAY_READ_LOCK(&xt_thr_array_resize_lock, thread->t_id);
405
my_wt = xt_thr_array[thread->t_id].td_waiting;
406
THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, thread->t_id);
408
xt_lock_mutex_ns(&my_wt->wt_lock);
412
/* This is a dirty read, but it should work! */
413
flags = wait_xact_ptr->xd_flags;
414
start = wait_xact_ptr->xd_start_xn_id;
417
if (!xn_get_xact_details(db, xw->xw_xn_id, thread, &flags, &start, NULL, NULL))
418
flags = XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
421
if ((flags & XT_XN_XAC_ENDED) || start != xw->xw_xn_id)
422
/* The transaction has terminated! */
425
xt_timed_wait_cond_ns(&my_wt->wt_cond, &my_wt->wt_lock, WAIT_FOR_XACT_TIME);
428
xt_unlock_mutex_ns(&my_wt->wt_lock);
432
xn_remove_from_wait_for(db, &wf, thread);
438
#else // XT_USE_SPINLOCK_WAIT_FOR
440
* The given thread must wait for the specified transaction to terminate. This
441
* function places the transaction of the thread on a list of waiting threads.
443
* Before waiting we make a check for deadlocks. A deadlock occurs
444
* if waiting would introduce a cycle.
446
xtPublic xtBool old_xt_xn_wait_for_xact(XTThreadPtr thread, xtXactID xn_id, xtBool will_retry, XTLockWaitFuncPtr pw_func, XTLockWaitPtr pw_data)
448
XTDatabaseHPtr db = thread->st_database;
453
ASSERT_NS(thread->st_xact_data);
455
thread->st_statistics.st_wait_for_xact++;
456
wf.wf_waiting_xn_id = thread->st_xact_data->xd_start_xn_id;
457
wf.wf_for_me_xn_id = xn_id;
458
wf.wf_thread_id = thread->t_id;
460
xt_lock_mutex_ns(&db->db_xn_wait_lock);
462
#ifdef TRACE_WAIT_FOR
463
xt_ttracef(thread, "T%lu -wait-> T%lu\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) xn_id);
466
if (!xn_get_xact_details(db, xn_id, thread, &flags, &start, NULL, NULL))
469
/* This is a dirty read, but it should work! */
470
if ((flags & XT_XN_XAC_ENDED) || start != xn_id)
473
if (xn_detect_deadlock(db, wf.wf_waiting_xn_id, wf.wf_for_me_xn_id))
476
/* We will wait for this transaction... */
477
db->db_xn_wait_count++;
478
if (thread->st_xact_writer)
479
db->db_xn_writer_wait_count++;
481
if (!xt_sl_insert(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id, &wf)) {
482
db->db_xn_wait_count--;
486
if (!xn_get_xact_details(db, xn_id, thread, &flags, &start, NULL, NULL)) {
487
xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
488
db->db_xn_wait_count--;
489
if (thread->st_xact_writer)
490
db->db_xn_writer_wait_count--;
494
if ((flags & XT_XN_XAC_ENDED) || start != xn_id) {
495
xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
496
db->db_xn_wait_count--;
497
if (thread->st_xact_writer)
498
db->db_xn_writer_wait_count--;
502
db->db_xn_post_wait[thread->t_id].pw_call_me = pw_func;
503
db->db_xn_post_wait[thread->t_id].pw_thread = thread;
504
db->db_xn_post_wait[thread->t_id].pw_data = pw_data;
506
/* Timed wait because it is possible that transaction quits before
509
if (!xt_timed_wait_cond(NULL, &db->db_xn_wait_cond, &db->db_xn_wait_lock, 2 * 1000)) {
510
xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
511
db->db_xn_wait_count--;
512
if (thread->st_xact_writer)
513
db->db_xn_writer_wait_count--;
517
db->db_xn_post_wait[thread->t_id].pw_call_me = NULL;
518
xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
519
db->db_xn_wait_count--;
520
if (thread->st_xact_writer)
521
db->db_xn_writer_wait_count--;
527
#ifdef TRACE_WAIT_FOR
528
xt_ttracef(thread, "T%lu -wait-> T%lu DONE\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) xn_id);
530
xt_unlock_mutex_ns(&db->db_xn_wait_lock);
534
#ifdef TRACE_WAIT_FOR
535
xt_ttracef(self, "T%lu -wait-> T%lu FAILED\n", (u_long) self->st_xact_data->xd_start_xn_id, (u_long) xn_id);
537
xt_unlock_mutex_ns(&db->db_xn_wait_lock);
541
xtPublic void old_xt_xn_wakeup_transactions(XTDatabaseHPtr db, XTThreadPtr thread)
546
xt_lock_mutex_ns(&db->db_xn_wait_lock);
547
/* The idea here is to release the oldest transactions
548
* first. Although this may not be completely fair
549
* it has the advantage that older transactions are
550
* encouraged to complete first.
552
* I have found the following problem with this test:
553
* runTest(INCREMENT_TEST, 16, INCREMENT_TEST_UPDATE_COUNT);
554
* with a bit of bad luck a transaction can be starved.
555
* This results in the sweeper stalling because it is
556
* waiting for an old transaction to quite so that
559
* Because the sweeper is waiting, the number of
560
* versions of the record to be updated
561
* begins to increase. In the above test over
562
* 1600 transaction remain uncleaned.
564
* This means that there are 1600 version of the
565
* row which must be scanned to find the most
568
if ((len = (u_int) xt_sl_get_size(db->db_xn_wait_for))) {
569
for (u_int i=0; i<len; i++) {
570
wf = (XNWaitForPtr) xt_sl_item_at(db->db_xn_wait_for, i);
571
if (db->db_xn_post_wait[wf->wf_thread_id].pw_call_me) {
572
if (db->db_xn_post_wait[wf->wf_thread_id].pw_call_me(thread, &db->db_xn_post_wait[wf->wf_thread_id]))
573
db->db_xn_post_wait[wf->wf_thread_id].pw_call_me = NULL;
576
if (!xt_broadcast_cond_ns(&db->db_xn_wait_cond))
577
xt_log_and_clear_exception_ns();
579
ASSERT_NS(db->db_xn_wait_count == len);
580
xt_unlock_mutex_ns(&db->db_xn_wait_lock);
582
#endif // XT_USE_SPINLOCK_WAIT_FOR
584
/* ----------------------------------------------------------------------
592
u_long not_clean_max;
596
static void xn_free_xact(XTDatabaseHPtr db, XTXactSegPtr seg, XTXactDataPtr xact)
601
/* This indicates the structure is free: */
602
xact->xd_start_xn_id = 0;
603
if ((xtWord1 *) xact >= db->db_xn_data && (xtWord1 *) xact < db->db_xn_data_end) {
604
/* Put it in the free list: */
605
xact->xd_next_xact = seg->xs_free_list;
606
seg->xs_free_list = xact;
613
* GOTCHA: The value db->db_xn_curr_id may be a bit larger
614
* than the actual transaction created because there is
615
* a gap between the issude of the transaction ID
616
* and the creation of a memory structure.
617
* (indicated here: {GAP-INC-ADD-XACT})
619
* This function returns the actuall current transaction ID.
620
* This is the number of the last transaction actually
623
* This means that if you call xt_xn_get_xact() with any
624
* number less than or equal to this value, not finding
625
* the transaction means it has already ended!
627
xtPublic xtXactID xt_xn_get_curr_id(XTDatabaseHPtr db)
631
register XTXactSegPtr seg = db->db_xn_idx;
633
/* Find the highest transaction ID actually created... */
634
curr_xn_id = seg->xs_last_xn_id;
636
for (i=1; i<XT_XN_NO_OF_SEGMENTS; i++, seg++) {
637
if (xt_xn_is_before(curr_xn_id, seg->xs_last_xn_id))
638
curr_xn_id = seg->xs_last_xn_id;
643
xtPublic XTXactDataPtr xt_xn_add_old_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr thread)
645
register XTXactDataPtr xact;
646
register XTXactSegPtr seg;
647
register XTXactDataPtr *hash;
650
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
651
XT_XACT_WRITE_LOCK(&seg->xs_tab_lock, thread);
652
hash = &seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
655
if (xact->xd_start_xn_id == xn_id)
657
xact = xact->xd_next_xact;
660
if ((xact = seg->xs_free_list))
661
seg->xs_free_list = xact->xd_next_xact;
663
/* We have used up all the free transaction slots,
664
* the sweeper should work faster to free them
667
db->db_sw_faster |= XT_SW_NO_MORE_XACT_SLOTS;
668
if (!(xact = (XTXactDataPtr) xt_malloc_ns(sizeof(XTXactDataRec)))) {
669
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
674
xact->xd_next_xact = *hash;
677
xact->xd_start_xn_id = xn_id;
678
xact->xd_end_xn_id = 0;
679
xact->xd_end_time = 0;
680
xact->xd_begin_log = 0;
683
/* Get the largest transaction id. */
684
if (xt_xn_is_before(seg->xs_last_xn_id, xn_id))
685
seg->xs_last_xn_id = xn_id;
688
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
691
if (tot_alloced > high_alloced)
692
high_alloced = tot_alloced;
697
static XTXactDataPtr xn_add_new_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr thread)
699
register XTXactDataPtr xact;
700
register XTXactSegPtr seg;
701
register XTXactDataPtr *hash;
704
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
705
XT_XACT_WRITE_LOCK(&seg->xs_tab_lock, thread);
706
hash = &seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
708
if ((xact = seg->xs_free_list))
709
seg->xs_free_list = xact->xd_next_xact;
711
/* We have used up all the free transaction slots,
712
* the sweeper should work faster to free them
715
db->db_sw_faster |= XT_SW_NO_MORE_XACT_SLOTS;
716
if (!(xact = (XTXactDataPtr) xt_malloc_ns(sizeof(XTXactDataRec)))) {
717
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
722
xact->xd_next_xact = *hash;
725
xact->xd_thread_id = thread->t_id;
726
xact->xd_start_xn_id = xn_id;
727
xact->xd_end_xn_id = 0;
728
xact->xd_end_time = 0;
729
xact->xd_begin_log = 0;
732
seg->xs_last_xn_id = xn_id;
733
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
736
if (tot_alloced > high_alloced)
737
high_alloced = tot_alloced;
742
static xtBool xn_get_xact_details(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread), int *flags, xtXactID *start, xtWord4 *end, xtThreadID *thd_id)
744
register XTXactSegPtr seg;
745
register XTXactDataPtr xact;
746
xtBool found = FALSE;
748
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
749
XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
750
xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
752
if (xact->xd_start_xn_id == xn_id) {
755
*flags = xact->xd_flags;
757
*start = xact->xd_start_xn_id;
759
*end = xact->xd_end_time;
761
*thd_id = xact->xd_thread_id;
764
xact = xact->xd_next_xact;
766
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
770
static xtBool xn_get_xact_pointer(XTDatabaseHPtr db, xtXactID xn_id, XTXactDataPtr *xact_ptr)
772
register XTXactSegPtr seg;
773
register XTXactDataPtr xact;
774
xtBool found = FALSE;
777
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
778
XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
779
xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
781
if (xact->xd_start_xn_id == xn_id) {
783
/* We only return pointers to transaction structures that are permanently
786
if ((xtWord1 *) xact >= db->db_xn_data && (xtWord1 *) xact < db->db_xn_data_end)
790
xact = xact->xd_next_xact;
792
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
797
* Note, this function only returns TRUE if the transaction
798
* still needs to be cleaned.
800
static xtBool xn_get_xact_start(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread), xtLogID *log_id, xtLogOffset *log_offset)
802
register XTXactSegPtr seg;
803
register XTXactDataPtr xact;
804
xtBool found = FALSE;
806
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
807
XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
808
xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
810
if (xact->xd_start_xn_id == xn_id) {
811
/* Consider only transactions that have not be cleaned! */
812
if (!(xact->xd_flags & XT_XN_XAC_CLEANED))
815
*log_id = xact->xd_begin_log;
816
*log_offset = xact->xd_begin_offset;
820
xact = xact->xd_next_xact;
822
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
826
/* NOTE: this function may only be used by the sweeper or the recovery process. */
827
xtPublic XTXactDataPtr xt_xn_get_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread))
829
register XTXactSegPtr seg;
830
register XTXactDataPtr xact;
832
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
833
XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
834
xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
836
if (xact->xd_start_xn_id == xn_id)
838
xact = xact->xd_next_xact;
840
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
845
* Delete a transaction, return TRUE if the transaction
848
xtPublic xtBool xt_xn_delete_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr thread)
850
XTXactDataPtr xact, pxact = NULL;
854
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
855
XT_XACT_WRITE_LOCK(&seg->xs_tab_lock, thread);
856
xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
858
if (xact->xd_start_xn_id == xn_id) {
860
pxact->xd_next_xact = xact->xd_next_xact;
862
seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE] = xact->xd_next_xact;
863
xn_free_xact(db, seg, xact);
864
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
868
xact = xact->xd_next_xact;
870
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
874
//#define DEBUG_RAM_LIST
875
#ifdef DEBUG_RAM_LIST
877
#define DEBUG_RAM_LIST_SIZE 80
879
int check_ram_init_count = 0;
880
xt_rwlock_type check_ram_lock;
881
xtXactID check_ram_trns[DEBUG_RAM_LIST_SIZE];
884
static void check_ram_init(void)
886
if (check_ram_init_count == 0)
887
xt_init_rwlock(NULL, &check_ram_lock);
888
check_ram_init_count++;
891
static void check_ram_free(void)
893
check_ram_init_count--;
894
if (check_ram_init_count == 0)
895
xt_free_rwlock(&check_ram_lock);
898
static void check_ram_min_id(XTDatabaseHPtr db)
902
xt_slock_rwlock_ns(&check_ram_lock);
903
for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
904
if (check_ram_trns[i] && xt_xn_is_before(check_ram_trns[i], db->db_xn_min_ram_id)) {
905
/* This should never happen! */
909
for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
910
if (check_ram_trns[i]) {
911
x_ptr = xt_xn_get_xact(db, check_ram_trns[i]);
918
xt_unlock_rwlock_ns(&check_ram_lock);
921
static void check_ram_add(xtXactID xn_id)
925
xt_xlock_rwlock_ns(&check_ram_lock);
926
for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
927
if (!check_ram_trns[i]) {
928
check_ram_trns[i] = xn_id;
929
xt_unlock_rwlock_ns(&check_ram_lock);
933
xt_unlock_rwlock_ns(&check_ram_lock);
934
printf("DEBUG --- List too small\n");
937
static void check_ram_del(xtXactID xn_id)
941
xt_xlock_rwlock_ns(&check_ram_lock);
942
for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
943
if (check_ram_trns[i] == xn_id) {
944
check_ram_trns[i] = 0;
945
xt_unlock_rwlock_ns(&check_ram_lock);
949
xt_unlock_rwlock_ns(&check_ram_lock);
953
/* ----------------------------------------------------------------------
957
xtPublic void xt_xn_init_db(XTThreadPtr self, XTDatabaseHPtr db)
962
#ifdef DEBUG_RAM_LIST
965
xt_spinlock_init_with_autoname(self, &db->db_xn_id_lock);
966
xt_spinlock_init_with_autoname(self, &db->db_xn_wait_spinlock);
967
xt_init_mutex_with_autoname(self, &db->db_xn_xa_lock);
968
//xt_init_mutex_with_autoname(self, &db->db_xn_wait_lock);
969
//xt_init_cond(self, &db->db_xn_wait_cond);
970
xt_init_mutex_with_autoname(self, &db->db_sw_lock);
971
xt_init_cond(self, &db->db_sw_cond);
972
xt_init_mutex_with_autoname(self, &db->db_wr_lock);
973
xt_init_cond(self, &db->db_wr_cond);
975
/* Pre-allocate transaction data structures: */
976
db->db_xn_data = (xtWord1 *) xt_malloc(self, sizeof(XTXactDataRec) * XT_XN_DATA_ALLOC_COUNT * XT_XN_NO_OF_SEGMENTS);
977
db->db_xn_data_end = db->db_xn_data + sizeof(XTXactDataRec) * XT_XN_DATA_ALLOC_COUNT * XT_XN_NO_OF_SEGMENTS;
978
xact = (XTXactDataPtr) db->db_xn_data;
979
for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
980
seg = &db->db_xn_idx[i];
981
XT_XACT_INIT_LOCK(self, &seg->xs_tab_lock);
982
for (u_int j=0; j<XT_XN_DATA_ALLOC_COUNT; j++) {
983
xact->xd_next_xact = seg->xs_free_list;
984
seg->xs_free_list = xact;
989
/* Create a sorted list for XA transactions recovered: */
990
db->db_xn_xa_list = xt_new_sortedlist(self, sizeof(XTXactXARec), 100, 50, xt_xn_xa_compare, db, NULL, FALSE, FALSE);
992
/* Initialize the data logs: */
993
db->db_datalogs.dlc_init(self, db);
995
/* Setup the transaction log: */
996
db->db_xlog.xlog_setup(self, db, (off_t) xt_db_log_file_threshold, xt_db_transaction_buffer_size, xt_db_log_file_count);
998
db->db_xn_end_time = 1;
1000
/* Initializing the restart file, also does
1001
* recovery. This returns the log position after recovery.
1003
* This is the log position where the writer thread will
1004
* begin. The writer thread writes changes to the database that
1005
* have been flushed to the log.
1007
xt_xres_init(self, db);
1009
/* Initialize the "last transaction in memory", by default
1010
* this is the current transaction ID, which is the ID
1011
* of the last transaction.
1013
for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
1014
seg = &db->db_xn_idx[i];
1015
XT_XACT_INIT_LOCK(self, &seg->xs_tab_lock);
1016
seg->xs_last_xn_id = db->db_xn_curr_id;
1020
* The next transaction to clean is the lowest transaction
1023
db->db_xn_to_clean_id = db->db_xn_min_ram_id;
1024
#ifdef XT_SWEEPER_SORT_XACTS
1025
db->db_sw_to_add = db->db_xn_min_ram_id;
1029
* No transactions are running, so the minimum transaction
1030
* ID is the next one to run:
1032
db->db_xn_min_run_id = db->db_xn_curr_id + 1;
1034
db->db_xn_wait_for = xt_new_sortedlist(self, sizeof(XNWaitForRec), 100, 50, xn_compare_wait_for, db, xn_free_wait_for, FALSE, FALSE);
1037
xtPublic void xt_xn_exit_db(XTThreadPtr self, XTDatabaseHPtr db)
1040
printf("=========> MOST TXs CURR ALLOC: %lu\n", tot_alloced);
1041
printf("=========> MOST TXs HIGH ALLOC: %lu\n", high_alloced);
1042
printf("=========> MAX TXs NOT CLEAN: %lu\n", not_clean_max);
1043
printf("=========> MAX TXs IN RAM: %lu\n", in_ram_max);
1045
XTXactPreparePtr xap, xap_next;
1047
xt_stop_sweeper(self, db); // Should be done already!
1048
xt_stop_writer(self, db); // Should be done already!
1050
xt_xres_exit(self, db);
1051
db->db_xlog.xlog_exit(self);
1053
db->db_datalogs.dlc_exit(self);
1055
for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
1058
seg = &db->db_xn_idx[i];
1059
for (u_int j=0; j<XT_XN_HASH_TABLE_SIZE; j++) {
1060
XTXactDataPtr xact, nxact;
1062
xact = seg->xs_table[j];
1064
nxact = xact->xd_next_xact;
1065
xn_free_xact(db, seg, xact);
1069
XT_XACT_FREE_LOCK(self, &seg->xs_tab_lock);
1071
if (db->db_xn_wait_for) {
1072
xt_free_sortedlist(self, db->db_xn_wait_for);
1073
db->db_xn_wait_for = NULL;
1075
if (db->db_xn_data) {
1076
xt_free(self, db->db_xn_data);
1077
db->db_xn_data = NULL;
1078
db->db_xn_data_end = NULL;
1081
xt_free_cond(&db->db_wr_cond);
1082
xt_free_mutex(&db->db_wr_lock);
1083
xt_free_cond(&db->db_sw_cond);
1084
xt_free_mutex(&db->db_sw_lock);
1085
//xt_free_cond(&db->db_xn_wait_cond);
1086
//xt_free_mutex(&db->db_xn_wait_lock);
1087
xt_free_mutex(&db->db_xn_xa_lock);
1088
for (u_int i=0; i<XT_XA_HASH_TAB_SIZE; i++) {
1089
xap = db->db_xn_xa_table[i];
1091
xap_next = xap->xp_next;
1096
if (db->db_xn_xa_list) {
1097
xt_free_sortedlist(self, db->db_xn_xa_list);
1098
db->db_xn_xa_list = NULL;
1100
xt_spinlock_free(self, &db->db_xn_wait_spinlock);
1101
xt_spinlock_free(self, &db->db_xn_id_lock);
1102
#ifdef DEBUG_RAM_LIST
1107
xtPublic void xt_xn_init_thread(XTThreadPtr self, int what_for)
1109
ASSERT(self->st_database);
1111
if (!xt_init_row_lock_list(&self->st_lock_list))
1114
case XT_FOR_COMPACTOR:
1115
self->st_dlog_buf.dlb_init(self->st_database, xt_db_log_buffer_size);
1118
/* The writer does not need a transaction buffer. */
1119
self->st_dlog_buf.dlb_init(self->st_database, 0);
1121
case XT_FOR_SWEEPER:
1123
self->st_dlog_buf.dlb_init(self->st_database, 0);
1126
self->st_dlog_buf.dlb_init(self->st_database, xt_db_log_buffer_size);
1131
xtPublic void xt_xn_exit_thread(XTThreadPtr self)
1133
if (self->st_xact_data)
1134
xt_xn_rollback(self);
1135
self->st_dlog_buf.dlb_exit(self);
1136
xt_exit_row_lock_list(&self->st_lock_list);
1139
/* ----------------------------------------------------------------------
1140
* Begin and End Transactions
1143
xtPublic xtBool xt_xn_begin(XTThreadPtr self)
1145
XTDatabaseHPtr db = self->st_database;
1148
ASSERT(!self->st_xact_data);
1150
xt_spinlock_lock(&db->db_xn_id_lock);
1151
xn_id = ++db->db_xn_curr_id;
1152
xt_spinlock_unlock(&db->db_xn_id_lock);
1155
if (xt_xn_is_before(not_clean_max, xn_id - db->db_xn_to_clean_id))
1156
not_clean_max = xn_id - db->db_xn_to_clean_id;
1157
if (xt_xn_is_before(in_ram_max, xn_id - db->db_xn_min_ram_id))
1158
in_ram_max = xn_id - db->db_xn_min_ram_id;
1160
/* {GAP-INC-ADD-XACT} This is the gap between incrementing the ID,
1161
* and creating the transaction in memory.
1162
* See xt_xn_get_curr_id().
1165
if (!(self->st_xact_data = xn_add_new_xact(db, xn_id, self)))
1167
self->st_xact_writer = FALSE;
1169
/* All transactions that committed before or at this time
1170
* are this one are visible: */
1171
self->st_visible_time = db->db_xn_end_time;
1173
#ifdef TRACE_TRANSACTION
1174
xt_ttracef(self, "BEGIN T%lu\n", (u_long) self->st_xact_data->xd_start_xn_id);
1176
#ifdef XT_TRACK_CONNECTIONS
1177
xt_track_conn_info[self->t_id].ci_curr_xact_id = self->st_xact_data->xd_start_xn_id;
1178
xt_track_conn_info[self->t_id].ci_xact_start = xt_trace_clock();
1183
static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
1188
ASSERT_NS(thread->st_xact_data);
1189
if ((xact = thread->st_xact_data)) {
1190
XTDatabaseHPtr db = thread->st_database;
1191
xtXactID xn_id = xact->xd_start_xn_id;
1194
if ((writer = thread->st_xact_writer)) {
1195
/* The transaction wrote something: */
1196
XTXactEndEntryDRec entry;
1199
sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
1200
entry.xe_status_1 = status;
1201
entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
1202
XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
1203
XT_SET_DISK_4(entry.xe_not_used_4, 0);
1205
#ifdef XT_IMPLEMENT_NO_ACTION
1206
/* This will check any resticts that have been delayed to the end of the statement. */
1207
if (thread->st_restrict_list.bl_count) {
1208
if (!xt_tab_restrict_rows(&thread->st_restrict_list, thread)) {
1210
status = XT_LOG_ENT_ABORT;
1215
/* Flush the data log: */
1216
if (!thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
1218
status = XT_LOG_ENT_ABORT;
1221
/* Write and flush the transaction log:
1222
* We only flush if this was not a temp table.
1224
if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, thread->st_non_temp_updated ? xt_db_flush_log_at_trx_commit : XT_XLOG_NO_WRITE_NO_FLUSH)) {
1226
status = XT_LOG_ENT_ABORT;
1227
/* Make sure this is done, if we failed to log
1228
* the transction end!
1230
if (thread->st_xact_writer) {
1231
/* Adjust this in case of error, but don't forget
1234
xt_spinlock_lock(&db->db_xlog.xl_buffer_lock);
1235
db->db_xn_writer_count--;
1236
thread->st_xact_writer = FALSE;
1237
if (thread->st_xact_long_running) {
1238
db->db_xn_long_running_count--;
1239
thread->st_xact_long_running = FALSE;
1241
xt_spinlock_unlock(&db->db_xlog.xl_buffer_lock);
1245
/* Setting this flag completes the transaction,
1246
* Do this before we release the locks, because
1247
* the unlocked transactions expect the
1248
* transaction they are waiting for to be
1251
xact->xd_end_time = ++db->db_xn_end_time;
1252
if (status == XT_LOG_ENT_COMMIT) {
1253
thread->st_statistics.st_commits++;
1254
xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
1257
thread->st_statistics.st_rollbacks++;
1258
xact->xd_flags |= XT_XN_XAC_ENDED;
1261
/* {REMOVE-LOCKS} Drop locks is you have any: */
1262
thread->st_lock_list.xt_remove_all_locks(db, thread);
1264
/* Do this afterwards to make sure the sweeper
1265
* does not cleanup transactions start cleaning up
1266
* before any transactions that were waiting for
1267
* this transaction have completed!
1269
xact->xd_end_xn_id = db->db_xn_curr_id;
1271
/* Now you can sweep! */
1272
ASSERT_NS(xact->xd_flags & XT_XN_XAC_LOGGED);
1273
xact->xd_flags |= XT_XN_XAC_SWEEP;
1276
/* Read-only transaction can be removed, immediately */
1277
xact->xd_end_time = ++db->db_xn_end_time;
1278
xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
1280
/* Drop locks is you have any: */
1281
thread->st_lock_list.xt_remove_all_locks(db, thread);
1283
xact->xd_end_xn_id = db->db_xn_curr_id;
1285
ASSERT_NS(!(xact->xd_flags & XT_XN_XAC_LOGGED));
1286
xact->xd_flags |= XT_XN_XAC_SWEEP;
1288
if (xt_xn_delete_xact(db, xn_id, thread)) {
1289
if (db->db_xn_min_ram_id == xn_id)
1290
db->db_xn_min_ram_id = xn_id+1;
1294
#ifdef TRACE_TRANSACTION
1295
if (status == XT_LOG_ENT_COMMIT)
1296
xt_ttracef(thread, "COMMIT T%lu\n", (u_long) xn_id);
1298
xt_ttracef(thread, "ABORT T%lu\n", (u_long) xn_id);
1301
if (db->db_xn_min_run_id == xn_id)
1302
db->db_xn_min_run_id = xn_id+1;
1304
thread->st_xact_data = NULL;
1306
#ifdef XT_TRACK_CONNECTIONS
1307
xt_track_conn_info[thread->t_id].ci_prev_xact_id = xt_track_conn_info[thread->t_id].ci_curr_xact_id;
1308
xt_track_conn_info[thread->t_id].ci_prev_xact_time = xt_trace_clock() - xt_track_conn_info[thread->t_id].ci_xact_start;
1309
xt_track_conn_info[thread->t_id].ci_curr_xact_id = 0;
1310
xt_track_conn_info[thread->t_id].ci_xact_start = 0;
1313
xt_wakeup_waiting_threads(thread);
1315
/* {WAKE-SW} Waking the sweeper
1316
* is no longer unconditional.
1317
* (see all comments to {WAKE-SW})
1319
* We now wake the sweeper if it is
1320
* supposed to work faster.
1322
* There are now 2 cases:
1323
* - We run out of transaction slots.
1324
* - We encounter old index entries.
1326
* The following test:
1327
* runTest(INCREMENT_TEST, 16, INCREMENT_TEST_UPDATE_COUNT);
1328
* has extreme problems with sweeping every 1/10s
1329
* because a huge number of index entries accumulate
1330
* that need to be cleaned.
1332
* New code detects this case.
1334
if (db->db_sw_faster)
1335
xt_wakeup_sweeper(db);
1337
/* Don't get too far ahead of the sweeper! */
1339
#ifdef XT_WAIT_FOR_CLEANUP
1340
if (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND) {
1341
/* Set a maximum wait time (1/100s) */
1342
xtWord8 then = xt_trace_clock() + (xtWord8) 100000;
1343
xtXactID wait_xn_id;
1345
/* This is the transaction that was committed 3 transactions ago: */
1346
wait_xn_id = thread->st_prev_xact[thread->st_last_xact];
1347
thread->st_prev_xact[thread->st_last_xact] = xn_id;
1348
/* This works because XT_MAX_XACT_BEHIND == 2! */
1349
ASSERT_NS((thread->st_last_xact + 1) % XT_MAX_XACT_BEHIND == (thread->st_last_xact ^ 1));
1350
thread->st_last_xact ^= 1;
1352
while (xt_xn_is_before(db->db_xn_to_clean_id, wait_xn_id) && (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND)) {
1353
if (xt_trace_clock() >= then)
1355
#ifdef XT_SWEEPER_SORT_XACTS
1356
if (!xn_get_xact_start(db, wait_xn_id, thread, NULL, NULL))
1363
if ((db->db_sw_faster & XT_SW_TOO_FAR_BEHIND) != 0) {
1364
xtWord8 then = xt_trace_clock() + (xtWord8) 20000000;
1368
if (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND)
1370
if (xt_trace_clock() >= then)
1380
xtPublic xtBool xt_xn_commit(XTThreadPtr thread)
1382
return xn_end_xact(thread, XT_LOG_ENT_COMMIT);
1385
xtPublic xtBool xt_xn_rollback(XTThreadPtr thread)
1387
return xn_end_xact(thread, XT_LOG_ENT_ABORT);
1390
xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id)
1392
XTXactNewTabEntryDRec entry;
1394
entry.xt_status_1 = XT_LOG_ENT_NEW_TAB;
1395
entry.xt_checksum_1 = XT_CHECKSUM_1(tab_id);
1396
XT_SET_DISK_4(entry.xt_tab_id_4, tab_id);
1397
return xt_xlog_log_data(self, sizeof(XTXactNewTabEntryDRec), (XTXactLogBufferDPtr) &entry, XT_XLOG_WRITE_AND_FLUSH);
1400
xtPublic int xt_xn_status(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID XT_UNUSED(rec_id))
1402
register XTThreadPtr self = ot->ot_thread;
1407
/* Conditional waste of time!
1408
* Drizzle has strict warnings.
1409
* I know this is not necessary!
1414
if (xn_id == self->st_xact_data->xd_start_xn_id)
1415
return XT_XN_MY_UPDATE;
1416
if (xt_xn_is_before(xn_id, self->st_database->db_xn_min_ram_id) ||
1417
!xn_get_xact_details(self->st_database, xn_id, ot->ot_thread, &flags, NULL, &end, NULL)) {
1418
/* Not in RAM, rollback done: */
1419
//*DBG*/xt_dump_xlogs(self->st_database, 0);
1420
//*DBG*/xt_check_table(self, ot);
1421
//*DBG*/xt_dump_trace();
1422
/* {XACT-NOT-IN-RAM}
1423
* This should never happen (CHANGED see below)!
1425
* Because if the transaction is no longer in RAM, then it has been
1426
* cleaned up. So the record should be marked as clean, or not
1429
* After sweeping, we wait for all transactions to quit that were
1430
* running at the time of cleanup before removing the transaction record.
1431
* (see {XACT-NOT-IN-RAM})
1433
* If this was not the case, then we could be here because:
1434
* - The user transaction (T2) reads record x and notes that the record
1435
* has not been cleaned (CLEAN bit not set).
1437
* - The sweeper is busy sweeping the transaction (T1) that created
1439
* The SW sets the CLEAN bit on record x, and the schedules T1 for
1442
* Now T1 should not be deleted before T2 quits. If it does happen
1443
* then we land up here.
1445
* THIS CAN NOW HAPPEN!
1447
* First of all, a MYSTERY:
1448
* This did happen, dispite the description above! The reason why
1449
* is left as an exercise to the reader (really, I don't now why!)
1451
* This has force me to add code to handle the situation. This
1452
* is done by re-reading the record that is being checked by this
1453
* function. After re-reading, the record should either be
1454
* invalid (free) or clean (CLEAN bit set).
1456
* If this is the case, then we will not run land up here
1459
* Because we are only here because the record was valid but not
1460
* clean (you can confirm this by looking at the code that
1461
* calls this function).
1463
return XT_XN_REREAD;
1465
if (!(flags & XT_XN_XAC_ENDED))
1466
/* Transaction not ended, may be visible. */
1467
return XT_XN_OTHER_UPDATE;
1468
/* Visible if the transaction was committed: */
1469
if (flags & XT_XN_XAC_COMMITTED) {
1470
if (!xt_xn_is_before(self->st_visible_time, end)) // was self->st_visible_time >= xact->xd_end_time
1471
return XT_XN_VISIBLE;
1472
return XT_XN_NOT_VISIBLE;
1474
return XT_XN_ABORTED;
1477
/* ----------------------------------------------------------------------
1481
xtPublic int xt_xn_xa_compare(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
1483
xtXactID *x = (xtXactID *) a;
1484
XTXactXAPtr y = (XTXactXAPtr) b;
1486
if (*x == y->xx_xact_id)
1488
if (xt_xn_is_before(*x, y->xx_xact_id))
1493
xtPublic xtBool xt_xn_prepare(int len, xtWord1 *xa_data, XTThreadPtr thread)
1497
ASSERT_NS(thread->st_xact_data);
1498
if ((xact = thread->st_xact_data)) {
1499
xtXactID xn_id = xact->xd_start_xn_id;
1501
/* Only makes sense if the transaction has already been logged: */
1502
if ((thread->st_xact_data->xd_flags & XT_XN_XAC_LOGGED)) {
1503
if (!xt_xlog_modify_table(0, XT_LOG_ENT_PREPARE, xn_id, 0, 0, 0, len, xa_data, thread))
1510
xtPublic xtBool xt_xn_store_xa_data(XTDatabaseHPtr db, xtXactID xact_id, int len, xtWord1 *xa_data, XTThreadPtr XT_UNUSED(thread))
1512
XTXactPreparePtr xap;
1516
if (!(xap = (XTXactPreparePtr) xt_malloc_ns(offsetof(XTXactPrepareRec, xp_xa_data) + len)))
1518
xap->xp_xact_id = xact_id;
1519
xap->xp_hash = xt_get_checksum4(xa_data, len);
1520
xap->xp_data_len = len;
1521
memcpy(xap->xp_xa_data, xa_data, len);
1522
xx.xx_xact_id = xact_id;
1525
idx = xap->xp_hash % XT_XA_HASH_TAB_SIZE;
1526
xt_lock_mutex_ns(&db->db_xn_xa_lock);
1527
if (!xt_sl_insert(NULL, db->db_xn_xa_list, &xact_id, &xx)) {
1528
xt_unlock_mutex_ns(&db->db_xn_xa_lock);
1531
xap->xp_next = db->db_xn_xa_table[idx];
1532
db->db_xn_xa_table[idx] = xap;
1533
xt_unlock_mutex_ns(&db->db_xn_xa_lock);
1537
xtPublic void xt_xn_delete_xa_data_by_xact(XTDatabaseHPtr db, xtXactID xact_id, XTThreadPtr thread)
1541
xt_lock_mutex_ns(&db->db_xn_xa_lock);
1542
if (!(xx = (XTXactXAPtr) xt_sl_find(NULL, db->db_xn_xa_list, &xact_id)))
1544
xt_xn_delete_xa_data(db, xx->xx_xa_ptr, TRUE, thread);
1547
xtPublic void xt_xn_delete_xa_data(XTDatabaseHPtr db, XTXactPreparePtr xap, xtBool unlock, XTThreadPtr XT_UNUSED(thread))
1550
XTXactPreparePtr xap_ptr, xap_pptr = NULL;
1552
xt_sl_delete(NULL, db->db_xn_xa_list, &xap->xp_xact_id);
1553
idx = xap->xp_hash % XT_XA_HASH_TAB_SIZE;
1554
xap_ptr = db->db_xn_xa_table[idx];
1559
xap_ptr = xap_ptr->xp_next;
1563
xap_pptr->xp_next = xap_ptr->xp_next;
1565
db->db_xn_xa_table[idx] = xap_ptr->xp_next;
1569
xt_unlock_mutex_ns(&db->db_xn_xa_lock);
1572
xtPublic XTXactPreparePtr xt_xn_find_xa_data(XTDatabaseHPtr db, int len, xtWord1 *xa_data, xtBool lock, XTThreadPtr XT_UNUSED(thread))
1575
XTXactPreparePtr xap;
1579
xt_lock_mutex_ns(&db->db_xn_xa_lock);
1580
hash = xt_get_checksum4(xa_data, len);
1581
idx = hash % XT_XA_HASH_TAB_SIZE;
1582
xap = db->db_xn_xa_table[idx];
1584
if (xap->xp_hash == hash &&
1585
xap->xp_data_len == len &&
1586
memcmp(xap->xp_xa_data, xa_data, len) == 0) {
1595
xtPublic XTXactPreparePtr xt_xn_enum_xa_data(XTDatabaseHPtr db, XTXactEnumXAPtr exa)
1599
if (!exa->exa_locked) {
1600
xt_lock_mutex_ns(&db->db_xn_xa_lock);
1601
exa->exa_locked = TRUE;
1604
if ((xx = (XTXactXAPtr) xt_sl_item_at(db->db_xn_xa_list, exa->exa_index))) {
1606
return xx->xx_xa_ptr;
1609
if (exa->exa_locked) {
1610
exa->exa_locked = FALSE;
1611
xt_unlock_mutex_ns(&db->db_xn_xa_lock);
1616
/* ----------------------------------------------------------------------
1617
* S W E E P E R F U N C T I O N S
1620
xtPublic xtWord8 xt_xn_bytes_to_sweep(XTDatabaseHPtr db, XTThreadPtr thread)
1623
xtXactID curr_xn_id;
1624
xtLogID xn_log_id = 0;
1625
xtLogOffset xn_log_offset = 0;
1626
xtLogID x_log_id = 0;
1627
xtLogOffset x_log_offset = 0;
1629
xtLogOffset log_offset;
1630
xtWord8 byte_count = 0;
1632
xn_id = db->db_xn_to_clean_id;
1633
curr_xn_id = xt_xn_get_curr_id(db);
1634
// Limit the number of transactions checked!
1635
for (int i=0; i<1000; i++) {
1636
if (xt_xn_is_before(curr_xn_id, xn_id))
1638
if (xn_get_xact_start(db, xn_id, thread, &x_log_id, &x_log_offset)) {
1640
if (xt_comp_log_pos(x_log_id, x_log_offset, xn_log_id, xn_log_offset) < 0) {
1641
xn_log_id = x_log_id;
1642
xn_log_offset = x_log_offset;
1646
xn_log_id = x_log_id;
1647
x_log_offset = x_log_offset;
1655
/* Assume the logs have the threshold: */
1656
log_id = db->db_xlog.xl_write_log_id;
1657
log_offset = db->db_xlog.xl_write_log_offset;
1658
if (xn_log_id < log_id) {
1659
if (xn_log_offset < xt_db_log_file_threshold)
1660
byte_count = (size_t) (xt_db_log_file_threshold - xn_log_offset);
1664
while (xn_log_id < log_id) {
1665
byte_count += (size_t) xt_db_log_file_threshold;
1668
if (xn_log_offset < log_offset)
1669
byte_count += (size_t) (log_offset - xn_log_offset);
1674
/* ----------------------------------------------------------------------
1675
* S W E E P E R P R O C E S S
1678
typedef struct XNSweeperState {
1679
XTDatabaseHPtr ss_db;
1680
XTXactSeqReadRec ss_seqread;
1681
XTDataBufferRec ss_databuf;
1683
XTBasicQueueRec ss_to_free;
1684
xtBool ss_flush_pending;
1685
xtTableID ss_not_found; /* Cache the last table not found, this saves time. */
1686
xtTableID ss_not_recovered; /* Cache the last table not recovered. */
1687
XTOpenTablePtr ss_ot;
1688
} XNSweeperStateRec, *XNSweeperStatePtr;
1691
* This function NULL if the table cannot be opened.
1692
* In this case cleanup_done will be set to TRUE
1693
* if the cleanup should be skipped.
1696
static XTOpenTablePtr xn_sw_get_open_table(XTThreadPtr self, XNSweeperStatePtr ss, xtTableID tab_id, xtBool *skip_cleanup)
1699
if (ss->ss_ot->ot_table->tab_id == tab_id)
1702
xt_db_return_table_to_pool(self, ss->ss_ot);
1706
if (ss->ss_not_found == tab_id || ss->ss_not_recovered == tab_id) {
1707
*skip_cleanup = TRUE;
1714
if (!(ss->ss_ot = xt_db_open_pool_table(self, ss->ss_db, tab_id, &r, TRUE))) {
1716
case XT_TAB_NOT_FOUND:
1717
/* Remember the table if it was not found: */
1718
ss->ss_not_found = tab_id;
1719
*skip_cleanup = TRUE;
1721
case XT_TAB_NO_DICTIONARY:
1722
case XT_TAB_POOL_CLOSED:
1723
*skip_cleanup = FALSE;
1726
*skip_cleanup = TRUE;
1732
/* Don't sweep transactions for table that have not been
1735
if (ss->ss_ot->ot_table->tab_recovery_not_done) {
1736
xt_db_return_table_to_pool(self, ss->ss_ot);
1738
ss->ss_not_recovered = tab_id;
1739
*skip_cleanup = TRUE;
1747
static void xn_sw_close_open_table(XTThreadPtr self, XNSweeperStatePtr ss)
1750
xt_db_return_table_to_pool(self, ss->ss_ot);
1756
* A thread can set a bit in db_sw_faster to make
1757
* the sweeper go faster.
1759
static void xn_sw_could_go_faster(XTThreadPtr self, XTDatabaseHPtr db)
1761
if (db->db_sw_faster) {
1762
if (!db->db_sw_fast) {
1763
xt_set_priority(self, xt_db_sweeper_priority+1);
1764
db->db_sw_fast = TRUE;
1769
static void xn_sw_go_slower(XTThreadPtr self, XTDatabaseHPtr db)
1771
if (db->db_sw_fast) {
1772
xt_set_priority(self, xt_db_sweeper_priority);
1773
db->db_sw_fast = FALSE;
1775
db->db_sw_faster = XT_SW_WORK_NORMAL;
1778
/* Add a record to the "to free" queue. We note the current
1779
* transaction at the time this is done. The record will
1780
* only be freed once this transaction terminated, together
1781
* with all transactions that started before it!
1783
* The reason for this is that a sequential scan or some
1784
* other operation may read a committed record which is no longer
1785
* valid because it is no longer the latest variation (the first
1786
* variation reachable from the row pointer).
1788
* In this case, the sweeper will free the variation.
1789
* If the variation is re-used and committed before
1790
* the sequential scan or read completes, and by some
1791
* fluke is used by the same record as previously,
1792
* the system will think the record is valid
1795
* Without re-reading the record the sequential
1796
* scan or other read will find it on the variation list, and
1797
* return the record data as if valid!
1799
* ------------ 2008-01-03
1801
* An example of this is:
1803
* Assume we have 3 records.
1804
* The 3rd record is deleted, and committed.
1805
* Before cleanup can be performed
1806
* a sequential scan takes a copy of the records.
1808
* Now assume a new insert is done before
1809
* the sequential scan gets to the 3rd record.
1811
* The insert allocates the 3rd row and 3rd record
1814
* Now, when the sequential scan gets to the old copy of the 3rd record,
1815
* this is valid because the row points to this record again.
1817
* HOWEVER! I have now changed the sequential scan so that it accesses
1818
* the records from the cache, without making a copy.
1820
* This means that this problem cannot occur because the sequential scan
1821
* always reads the current data from the cache.
1823
* There is also no race condition (although no lock is taken), because
1824
* the record is writen before the row (see here [(5)]).
1826
* This means that the row does not point to the record before the
1827
* record has been modified.
1829
* Once the record has been modified then the sequential scan will see
1830
* that the record belongs to a new transaction.
1832
* If the row pointer was set before the record updated then a race
1833
* condition would exist when the sequential scan reads the record
1834
* after the insert has updated the row pointer but before it has
1835
* changed the record.
1839
* I believe I can remove the delayed free record!
1841
* This means I can combine the REMOVE and FREE operations.
1843
* This is good because this takes care of the problem
1844
* that records are lost when:
1846
* The server crashes when the delayed free list still has items on it.
1848
* The transaction that freed the records has been cleaned, and this
1849
* fact has been committed to the log.
1851
* So I have removed the delay here: [(6)]
1853
* ------------ 2008-12-03
1855
* This code to delay removal of records was finally removed (see above)
1859
* As above, but instead a transaction is added to the "to free" queue.
1861
* It is important that transactions remain in memory until all
1862
* currently running transactions have ended. This is because
1863
* sequential and index scans have copies of old data.
1865
* In the old data a record may not be indicated as cleaned. Such
1866
* a record is considered invalid if the transaction is not in RAM.
1870
* And this problem is demonstrated by the following example
1871
* which was derived from flush_table.test.
1873
* Each handler command below is a separate transaction.
1874
* However the buffer is loaded by 'read first'.
1875
* Depending on when cleanup occurs, records can disappear
1876
* in some of the next commands.
1878
* 2 solutions for the test. Use begin ... commit around
1879
* handler open ... close. Or use analyze table t1 before
1880
* open. analyze table waits for the sweeper to complete!
1882
* create table dummy(table_id char(20) primary key);
1886
* drop table if exists t1;
1887
* create table t1(table_id char(20) primary key);
1888
* insert into t1 values ('Record-01');
1889
* insert into t1 values ('Record-02');
1890
* insert into t1 values ('Record-03');
1891
* insert into t1 values ('Record-04');
1892
* insert into t1 values ('Record-05');
1894
* handler t1 read first limit 1;
1895
* handler t1 read next limit 1;
1896
* handler t1 read next limit 1;
1897
* handler t1 read next limit 1;
1904
#ifdef MUST_DELAY_REMOVE
1905
static void xn_sw_add_xact_to_free(XTThreadPtr self, XNSweeperStatePtr ss, xtXactID xn_id)
1907
XNSWToFreeItemRec free_item;
1909
if ((ss->ss_to_free.bq_front - ss->ss_to_free.bq_back) >= XT_TN_MAX_TO_FREE) {
1910
/* If the queue is full, try to free some items:
1911
* We use the call count to avoid doing this every time,
1912
* when the queue overflows!
1914
if ((ss->ss_call_cnt % XT_TN_MAX_TO_FREE_CHECK) == 0)
1915
/* GOTCHA: This call was not locking the sweeper,
1916
* this could cause failure, of course:
1918
xn_sw_service_to_free(self, ss, TRUE);
1922
free_item.ri_wait_xn_id = ss->ss_db->db_xn_curr_id;
1923
free_item.ri_tab_id = 0;
1924
free_item.x.ri_xn_id = xn_id;
1926
xt_bq_add(self, &ss->ss_to_free, &free_item);
1930
static void xt_sw_delete_variations(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, xtRecordID rec_id, xtRowID row_id, xtXactID xn_id)
1932
xtRecordID prev_var_rec_id;
1935
switch (xt_tab_remove_record(ot, rec_id, ss->ss_databuf.db_data, &prev_var_rec_id, FALSE, row_id, xn_id)) {
1942
rec_id = prev_var_rec_id;
1946
static void xt_sw_delete_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, xtRecordID rec_id, xtBool clean_delete, xtRowID row_id, xtXactID xn_id)
1948
xtRecordID prev_var_rec_id;
1950
switch (xt_tab_remove_record(ot, rec_id, ss->ss_databuf.db_data, &prev_var_rec_id, clean_delete, row_id, xn_id)) {
1961
/* Set rec_type to this value in order to force cleanup, without
1964
#define XN_FORCE_CLEANUP XT_TAB_STATUS_FREED
1967
* Read the record to be cleaned. Return TRUE if the cleanup has already been done.
1969
static xtBool xn_sw_cleanup_done(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtXactID xn_id, u_int rec_type, u_int stat_id, xtRowID row_id, XTTabRecHeadDPtr rec_head)
1971
if (!xt_tab_get_rec_data(ot, rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) rec_head))
1974
if (rec_type == XN_FORCE_CLEANUP) {
1975
if (XT_REC_IS_FREE(rec_head->tr_rec_type_1))
1979
/* Transaction must match: */
1980
if (XT_GET_DISK_4(rec_head->tr_xact_id_4) != xn_id)
1983
/* Record header must match expected value from
1984
* log or clean has been done, or is not required.
1986
* For example, it is not required if a record
1987
* has been overwritten in a transaction.
1989
if (rec_head->tr_rec_type_1 != rec_type ||
1990
rec_head->tr_stat_id_1 != stat_id)
1993
/* Row must match: */
1994
if (XT_GET_DISK_4(rec_head->tr_row_id_4) != row_id)
2001
static void xn_sw_clean_indices(XTThreadPtr XT_NDEBUG_UNUSED(self), XTOpenTablePtr ot, xtRecordID rec_id, xtRowID row_id, xtWord1 *rec_data, xtWord1 *rec_buffer)
2003
XTTableHPtr tab = ot->ot_table;
2007
if (!tab->tab_dic.dic_key_count)
2010
cols_req = tab->tab_dic.dic_ind_cols_req;
2011
if (XT_REC_IS_FIXED(rec_data[0]))
2012
rec_buffer = rec_data + XT_REC_FIX_HEADER_SIZE;
2014
if (XT_REC_IS_VARIABLE(rec_data[0])) {
2015
if (!myxt_load_row(ot, rec_data + XT_REC_FIX_HEADER_SIZE, rec_buffer, cols_req))
2018
else if (XT_REC_IS_EXT_DLOG(rec_data[0])) {
2020
if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
2021
if (!myxt_load_row(ot, rec_data + XT_REC_EXT_HEADER_SIZE, rec_buffer, cols_req))
2025
if (rec_data != ot->ot_row_rbuffer)
2026
memcpy(ot->ot_row_rbuffer, rec_data, tab->tab_dic.dic_rec_size);
2027
if (!xt_tab_load_ext_data(ot, rec_id, rec_buffer, cols_req))
2032
/* This is possible, the record has already been cleaned up. */
2036
ind = tab->tab_dic.dic_keys;
2037
for (u_int i=0; i<tab->tab_dic.dic_key_count; i++, ind++) {
2038
if (!xt_idx_update_row_id(ot, *ind, rec_id, row_id, rec_buffer))
2039
xt_log_and_clear_exception_ns();
2044
xt_log_and_clear_exception_ns();
2048
* Return TRUE if the cleanup was done. FAILED if cleanup could not be done
2049
* because dictionary information is not available.
2051
static xtBool xn_sw_cleanup_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTXactDataPtr xact, xtTableID tab_id, xtRecordID rec_id, u_int status, u_int rec_type, u_int stat_id, xtRowID row_id, xtWord1 *rec_buf)
2055
XTTabRecHeadDRec rec_head;
2056
xtRecordID after_rec_id;
2058
xtBool skip_cleanup;
2060
if (!(ot = xn_sw_get_open_table(self, ss, tab_id, &skip_cleanup)))
2061
/* The table no longer exists, consider cleanup done: */
2062
return skip_cleanup;
2065
ASSERT_NS(ot->ot_thread == self);
2067
/* Make sure the buffer is large enough! */
2068
xt_db_set_size(self, &ss->ss_databuf, (size_t) tab->tab_dic.dic_mysql_buf_size);
2070
xn_id = xact->xd_start_xn_id;
2071
if (xact->xd_flags & XT_XN_XAC_COMMITTED) {
2072
/* The transaction has been committed. Clean the record and
2073
* remove variations no longer in use.
2076
case XT_LOG_ENT_REC_MODIFIED:
2077
case XT_LOG_ENT_UPDATE:
2078
case XT_LOG_ENT_UPDATE_FL:
2079
case XT_LOG_ENT_UPDATE_BG:
2080
case XT_LOG_ENT_UPDATE_FL_BG:
2081
if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
2083
after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
2084
xt_sw_delete_variations(self, ss, ot, after_rec_id, row_id, xn_id);
2085
rec_head.tr_rec_type_1 |= XT_TAB_STATUS_CLEANED_BIT;
2086
XT_SET_NULL_DISK_4(rec_head.tr_prev_rec_id_4);
2087
if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_CLEANED, 0, rec_id, offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE, (xtWord1 *) &rec_head))
2089
xn_sw_clean_indices(self, ot, rec_id, row_id, rec_buf, ss->ss_databuf.db_data);
2091
case XT_LOG_ENT_INSERT:
2092
case XT_LOG_ENT_INSERT_FL:
2093
case XT_LOG_ENT_INSERT_BG:
2094
case XT_LOG_ENT_INSERT_FL_BG: {
2097
* DROP TABLE IF EXISTS t1;
2098
* CREATE TABLE t1 ( id int, name varchar(300)) engine=pbxt;
2101
* insert t1(id, name) values(1, "aaa");
2102
* update t1 set name=REPEAT('A', 300) where id = 1;
2107
* Because the type of record changes, from VARIABLE to
2108
* EXTENDED, the cleanup needs to take this into account.
2110
* The input new status value which is written here
2111
* depends on the first write to the record.
2112
* However, the second write changes the record status.
2114
* Previously we used a OR function to write the bit and
2115
* return the byte value of the result.
2117
* The write funtion now checks the record to be written
2118
* to make sure it matches the record that needs to be
2119
* cleaned. So OR'ing the bit is no longer required.
2123
* We have changed this to fix the following bug:
2127
* T2 insert record 100 in row 50
2129
* T1 updates row 50 and adds record 101
2131
* The sweeper does cleanup in order T1, T2, ...
2133
* The sweeper cleans T1 by removing record 100 from the
2134
* row 50 variation list.
2135
* This means that record 100 is free.
2137
* The sweeper cleans T2 by marking record 100 as clean.
2138
* !BUG! record 100 has already been freed!
2140
* To avoid this we have to check a record before
2141
* cleaning (as we do above for update in xn_sw_cleanup_done())
2142
* We check that the record is, in fact, the exact
2143
* record that was inserted.
2145
* This is now done be xt_tc_write_cond().
2149
rec_head.tr_rec_type_1 = rec_type | XT_TAB_STATUS_CLEANED_BIT;
2150
if(!tab->tab_recs.xt_tc_write_cond(self, ot->ot_rec_file, rec_id, rec_head.tr_rec_type_1, &op_seq, xn_id, row_id, stat_id, rec_type))
2151
/* this means record was not updated by xt_tc_write_bor and doesn't need to */
2153
if (!xt_xlog_modify_table(tab->tab_id, XT_LOG_ENT_REC_CLEANED_1, op_seq, 0, 0, rec_id, 1, &rec_head.tr_rec_type_1, self))
2155
xn_sw_clean_indices(self, ot, rec_id, row_id, rec_buf, ss->ss_databuf.db_data);
2158
case XT_LOG_ENT_DELETE:
2159
case XT_LOG_ENT_DELETE_FL:
2160
case XT_LOG_ENT_DELETE_BG:
2161
case XT_LOG_ENT_DELETE_FL_BG:
2162
if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
2164
after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
2165
xt_sw_delete_variations(self, ss, ot, after_rec_id, row_id, xn_id);
2166
xt_sw_delete_variation(self, ss, ot, rec_id, TRUE, row_id, xn_id);
2168
if (!xt_tab_free_row(ot, tab, row_id))
2175
/* The transaction has been aborted. Remove the variation from the
2176
* variation list. If this means the list is empty, then remove
2177
* the record as well.
2179
xtRecordID first_rec_id, next_rec_id, prev_rec_id;
2180
XTTabRecHeadDRec prev_rec_head;
2182
if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
2186
row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
2187
after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
2191
/* Now remove the record from the variation list,
2192
* (if it is still on the list).
2194
XT_TAB_ROW_WRITE_LOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
2196
/* Find the variation before the variation we wish to remove: */
2197
if (!(xt_tab_get_row(ot, row_id, &first_rec_id)))
2200
next_rec_id = first_rec_id;
2201
while (next_rec_id != rec_id) {
2203
/* The record was not found in the list (we are done) */
2204
XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
2207
if (!xt_tab_get_rec_data(ot, next_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &prev_rec_head)) {
2208
xt_log_and_clear_exception(self);
2212
prev_rec_id = next_rec_id;
2213
next_rec_id = XT_GET_DISK_4(prev_rec_head.tr_prev_rec_id_4);
2216
if (next_rec_id == rec_id) {
2217
/* The record was found on the list: */
2219
/* Unlink the deleted variation:
2220
* I have found the following sequence:
2222
* 17933 in use 1906112
2223
* 1906112 delete xact=2901 row=17933 prev=2419240
2224
* 2419240 delete xact=2899 row=17933 prev=2153360
2225
* 2153360 record-X C xact=2599 row=17933 prev=0 Xlog=151 Xoff=16824 Xsiz=100
2227
* Despite the following facts which should prevent chains from
2230
* --- Only one transaction can modify a row
2231
* at any one time. So it is not possible for a new change
2232
* to be linked onto an uncommitted change.
2234
* --- Transactions that modify the same row
2235
* twice do not allocate a new record for each change.
2237
* -- A change that has been
2238
* rolled back will not be linked onto. Instead
2239
* the new transaction will link to the last.
2242
* So if the sweeper is slow in doing its job
2243
* we can have the situation that a number of records
2244
* can refer to the last committed record of the
2247
* Only one will be reference by the row pointer.
2249
* The other, will all have been rolled back.
2250
* This occurs over here: [(4)]
2252
XT_SET_DISK_4(prev_rec_head.tr_prev_rec_id_4, after_rec_id);
2253
if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_UNLINKED, 0, prev_rec_id, offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE, (xtWord1 *) &prev_rec_head))
2257
/* Variation to be removed at the front of the list. */
2258
ASSERT(rec_id == first_rec_id);
2260
/* Unlink the deleted variation, from the front of the list: */
2261
if (!xt_tab_set_row(ot, XT_LOG_ENT_ROW_SET, row_id, after_rec_id))
2265
/* No more variations, remove the row: */
2266
if (!xt_tab_free_row(ot, tab, row_id))
2272
XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
2274
/* Note: even when not found on the row list, the record must still
2277
* There might be an exception to this, but there are very definite
2278
* cases where this is required, for example when an unreferenced
2279
* record is found and added to the clean up list xn_add_cu_record().
2283
/* Delete the extended record and index entries:
2285
* NOTE! This must be done after we have release the row lock. Because
2286
* a thread that does a duplicate check locks the index, and then
2287
* check whether a row is valid, and can deadlock with
2288
* code that locks a row, then an index!
2290
* However, this should all be OK, because the variation has been removed from the
2291
* row variation list at this stage, and now just need to be deleted.
2293
xt_sw_delete_variation(self, ss, ot, rec_id, FALSE, row_id, xn_id);
2300
XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
2305
/* Go through all updated records of a transaction and cleanup.
2306
* This means, of the transaction was aborted, then all the variations written
2307
* by the transaction must be removed.
2308
* If the transaction was committed then we remove older variations.
2309
* If a delete was committed this can lead to the row being removed.
2311
* After a transaction has been cleaned it can be removed from RAM.
2312
* If this was the last transaction in a log, and the log has reached
2313
* threshold, and the log is no longer in exclusive use, then the log
2316
* This function returns OK if the transaction was cleaned up, FALSE
2317
* if a retry is required. Othersize an error is thrown.
2319
static xtBool xn_sw_cleanup_xact(XTThreadPtr self, XNSweeperStatePtr ss, XTXactDataPtr xact)
2321
XTDatabaseHPtr db = ss->ss_db;
2322
XTXactLogBufferDPtr record;
2328
if (!db->db_xlog.xlog_seq_start(&ss->ss_seqread, xact->xd_begin_log, xact->xd_begin_offset, FALSE))
2335
xn_sw_could_go_faster(self, db);
2337
if (!db->db_xlog.xlog_seq_next(&ss->ss_seqread, &record, FALSE, self))
2340
/* Recovered transactions are considered cleaned when we
2341
* reach the end of the transaction log.
2342
* This is required, because transactions that do
2343
* not have a commit (or rollback) record, because they were
2344
* running when the server last went down, will otherwise not
2345
* have the cleanup completed!!
2347
ASSERT(xact->xd_flags & XT_XN_XAC_RECOVERED);
2348
if (!(xact->xd_flags & XT_XN_XAC_RECOVERED))
2352
switch (record->xh.xh_status_1) {
2353
case XT_LOG_ENT_NEW_LOG:
2354
if (!db->db_xlog.xlog_seq_start(&ss->ss_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, FALSE))
2357
case XT_LOG_ENT_COMMIT:
2358
case XT_LOG_ENT_ABORT:
2359
xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
2360
if (xn_id == xact->xd_start_xn_id)
2363
case XT_LOG_ENT_REC_MODIFIED:
2364
case XT_LOG_ENT_UPDATE:
2365
case XT_LOG_ENT_INSERT:
2366
case XT_LOG_ENT_DELETE:
2367
case XT_LOG_ENT_UPDATE_BG:
2368
case XT_LOG_ENT_INSERT_BG:
2369
case XT_LOG_ENT_DELETE_BG:
2370
xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
2371
if (xn_id != xact->xd_start_xn_id)
2373
tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
2374
rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
2375
row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
2376
if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_id, record->xu.xu_status_1, record->xu.xu_rec_type_1, record->xu.xu_stat_id_1, row_id, &record->xu.xu_rec_type_1))
2379
case XT_LOG_ENT_UPDATE_FL:
2380
case XT_LOG_ENT_INSERT_FL:
2381
case XT_LOG_ENT_DELETE_FL:
2382
case XT_LOG_ENT_UPDATE_FL_BG:
2383
case XT_LOG_ENT_INSERT_FL_BG:
2384
case XT_LOG_ENT_DELETE_FL_BG:
2385
xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
2386
if (xn_id != xact->xd_start_xn_id)
2388
tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
2389
rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
2390
row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
2391
if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_id, record->xf.xf_status_1, record->xf.xf_rec_type_1, record->xf.xf_stat_id_1, row_id, &record->xf.xf_rec_type_1))
2400
/* Write the log to indicate the transaction has been cleaned: */
2401
XTXactCleanupEntryDRec cu;
2403
cu.xc_status_1 = XT_LOG_ENT_CLEANUP;
2404
cu.xc_checksum_1 = XT_CHECKSUM_1(XT_CHECKSUM4_XACT(xact->xd_start_xn_id));
2405
XT_SET_DISK_4(cu.xc_xact_id_4, xact->xd_start_xn_id);
2407
if (!xt_xlog_log_data(self, sizeof(XTXactCleanupEntryDRec), (XTXactLogBufferDPtr) &cu, XT_XLOG_NO_WRITE_NO_FLUSH))
2410
ss->ss_flush_pending = TRUE;
2412
xact->xd_flags |= XT_XN_XAC_CLEANED;
2413
#ifndef XT_SWEEPER_SORT_XACTS
2414
ASSERT(db->db_xn_to_clean_id == xact->xd_start_xn_id);
2416
#ifdef MUST_DELAY_REMOVE
2417
xn_sw_add_xact_to_free(self, ss, xact->xd_start_xn_id);
2419
xn_id = xact->xd_start_xn_id;
2420
if (xt_xn_delete_xact(db, xn_id, self)) {
2421
/* Recalculate the minimum memory transaction: */
2422
ASSERT(!xt_xn_is_before(xn_id, db->db_xn_min_ram_id));
2424
if (db->db_xn_min_ram_id == xn_id) {
2425
db->db_xn_min_ram_id = xn_id+1;
2428
xtXactID xn_curr_xn_id = xt_xn_get_curr_id(db);
2430
while (!xt_xn_is_before(xn_curr_xn_id, db->db_xn_min_ram_id)) { // was db->db_xn_min_ram_id <= xn_curr_xn_id
2431
/* db_xn_min_ram_id may be changed, by some other process! */
2432
xn_id = db->db_xn_min_ram_id;
2433
if (xn_get_xact_details(db, xn_id, self, NULL, NULL, NULL, NULL))
2435
db->db_xn_min_ram_id = xn_id+1;
2444
static void xn_free_sw_state(XTThreadPtr self, XNSweeperStatePtr ss)
2446
xn_sw_close_open_table(self, ss);
2448
ss->ss_db->db_xlog.xlog_seq_exit(&ss->ss_seqread);
2449
xt_db_set_size(self, &ss->ss_databuf, 0);
2450
xt_bq_set_size(self, &ss->ss_to_free, 0);
2453
#ifdef XT_SWEEPER_SORT_XACTS
2454
static int xn_compare_xact(const void *a, const void *b)
2456
register XTXactDataPtr b_a = *((XTXactDataPtr *) a);
2457
register XTXactDataPtr b_b = *((XTXactDataPtr *) b);
2459
if (b_a->xd_end_xn_id == b_b->xd_end_xn_id) {
2460
if (b_a->xd_start_xn_id < b_b->xd_start_xn_id)
2464
if (b_a->xd_end_xn_id < b_b->xd_end_xn_id)
2470
static void xn_sw_main(XTThreadPtr self)
2472
XTDatabaseHPtr db = self->st_database;
2473
XNSweeperStatePtr ss;
2475
time_t idle_start = 0;
2477
#ifdef XT_SWEEPER_SORT_XACTS
2479
xtXactID next_clean_id;
2481
XTXactDataPtr xact2;
2484
xt_set_priority(self, xt_db_sweeper_priority);
2486
alloczr_(ss, xn_free_sw_state, sizeof(XNSweeperStateRec), XNSweeperStatePtr);
2489
if (!db->db_xlog.xlog_seq_init(&ss->ss_seqread, xt_db_log_buffer_size, FALSE))
2492
ss->ss_to_free.bq_item_size = sizeof(XNSWToFreeItemRec);
2493
ss->ss_to_free.bq_max_waste = XT_TN_MAX_TO_FREE_WASTE;
2494
ss->ss_to_free.bq_item_inc = XT_TN_MAX_TO_FREE_INC;
2495
ss->ss_call_cnt = 0;
2496
ss->ss_flush_pending = FALSE;
2498
while (!self->t_quit) {
2499
while (!self->t_quit) {
2500
curr_id = xt_xn_get_curr_id(db);
2502
#ifdef XT_SWEEPER_SORT_XACTS
2503
/* Add transactions to the list if required: */
2504
while (db->db_sw_list_size < XT_SW_XACT_SORT_LIST_SIZE &&
2505
!xt_xn_is_before(curr_id, db->db_sw_to_add)) {
2506
if ((xact = xt_xn_get_xact(db, db->db_sw_to_add, self))) {
2507
/* Only add transactions that have completed: */
2508
if (!(xact->xd_flags & XT_XN_XAC_SWEEP))
2511
/* Add only transactions that did an update to the list: */
2512
if ((xact->xd_flags & XT_XN_XAC_LOGGED)) {
2513
db->db_sw_xact_list[db->db_sw_list_size] = xact;
2514
db->db_sw_list_size++;
2517
/* Should not be required (done by the transction itself)! */
2518
if (xt_xn_delete_xact(db, db->db_sw_to_add, self)) {
2519
if (db->db_xn_min_ram_id == db->db_sw_to_add)
2520
db->db_xn_min_ram_id = db->db_sw_to_add+1;
2525
/* If there are no transactions to be cleaned, then the
2526
* next to clean will be at least the next one to check.
2528
if (!db->db_sw_list_size)
2529
db->db_xn_to_clean_id = db->db_sw_to_add;
2532
if (!db->db_sw_list_size) {
2533
/* Nothing to do: */
2534
db->db_sw_faster &= ~XT_SW_TOO_FAR_BEHIND;
2538
if (!db->db_sw_list_size == XT_SW_XACT_SORT_LIST_SIZE)
2539
db->db_sw_faster |= XT_SW_TOO_FAR_BEHIND;
2540
xn_sw_could_go_faster(self, db);
2543
/* Sort the transactions, according to there end order: */
2544
qsort(db->db_sw_xact_list, db->db_sw_list_size, sizeof(XTXactDataPtr), xn_compare_xact);
2546
for (i=0; i<db->db_sw_list_size; i++) {
2547
xact = db->db_sw_xact_list[i];
2548
if (!xt_xn_is_before(xact->xd_end_xn_id, db->db_sw_to_add)) // xact->xd_end_xn_id >= db->db_sw_to_add
2551
if (!xn_sw_cleanup_xact(self, ss, xact)) {
2552
/* We failed to clean (try again later)... */
2553
#ifdef TRACE_SWEEPER_ACTIVITY
2554
printf("SWEEPER: cleanup retry...\n", (int) xact->xd_start_xn_id);
2560
if (i == db->db_sw_list_size) {
2561
/* All cleaned out: */
2562
db->db_xn_to_clean_id = db->db_sw_to_add;
2563
db->db_sw_list_size = 0;
2568
/* The next to clean will be the smallest still in the
2571
* NOTE: db_xn_to_clean_id means that all transactions
2572
* before this are clean.
2574
* It may be that some after this point have also
2577
next_clean_id = db->db_sw_xact_list[i]->xd_start_xn_id;
2578
for (j=i+1; j<db->db_sw_list_size; j++) {
2579
if (xt_xn_is_before(db->db_sw_xact_list[j]->xd_start_xn_id, next_clean_id))
2580
next_clean_id = db->db_sw_xact_list[j]->xd_start_xn_id;
2582
db->db_xn_to_clean_id = next_clean_id;
2585
/* Something to do, but nothing done! */
2586
if ((xact = xt_xn_get_xact(db, db->db_sw_to_add, self))) {
2587
/* Before we go to sleep, lets just check again: */
2588
if (!(xact->xd_flags & XT_XN_XAC_SWEEP)) {
2589
db->db_stat_sweep_waits++;
2595
memmove(db->db_sw_xact_list, &db->db_sw_xact_list[i], (db->db_sw_list_size - i) * sizeof(XTXactDataPtr));
2596
db->db_sw_list_size -= i;
2599
/* We are just about to check the condition for sleeping,
2600
* so if the condition for sleeping holds, then we will
2601
* exit the loop and sleep.
2603
* We will then sleep if nobody sets the flag before we
2604
* actually do sleep!
2606
if (xt_xn_is_before(curr_id, db->db_xn_to_clean_id)) {
2607
db->db_sw_faster &= ~XT_SW_TOO_FAR_BEHIND;
2611
/* {TUNING} How far to we allow the sweeper to get behind?
2612
* The higher this is, the higher burst performance can
2613
* be. But too high and the sweeper falls out of reading the
2614
* transaction log cache, and also starts to spread
2615
* changes around in index and data blocks that are no
2618
if (curr_id - db->db_xn_to_clean_id > 250)
2619
db->db_sw_faster |= XT_SW_TOO_FAR_BEHIND;
2621
db->db_sw_faster &= ~XT_SW_TOO_FAR_BEHIND;
2622
xn_sw_could_go_faster(self, db);
2625
if ((xact = xt_xn_get_xact(db, db->db_xn_to_clean_id, self))) {
2628
/* The sweep flag is set when the transaction is ready for sweeping.
2629
* Prepared transactions may not be swept!
2631
if (!(xact->xd_flags & XT_XN_XAC_SWEEP) || (xact->xd_flags & XT_XN_XAC_PREPARED))
2634
/* Check if we can cleanup the transaction.
2635
* We do this by checking to see if there is any running
2636
* transaction which start before the end of this transaction.
2638
xn_id = xact->xd_start_xn_id;
2639
while (xt_xn_is_before(xn_id, xact->xd_end_xn_id)) {
2641
if ((xact2 = xt_xn_get_xact(db, xn_id, self))) {
2642
if (!(xact2->xd_flags & XT_XN_XAC_ENDED)) {
2643
/* A transaction was started before the end of
2644
* the transaction we wish to sweep, and this
2645
* transaction has not committed, the we have to
2648
db->db_stat_sweep_waits++;
2654
/* Can cleanup the transaction, and move to the next. */
2655
if (xact->xd_flags & XT_XN_XAC_LOGGED) {
2656
#ifdef TRACE_SWEEPER_ACTIVITY
2657
printf("SWEEPER: cleanup %d\n", (int) xact->xd_start_xn_id);
2659
if (!xn_sw_cleanup_xact(self, ss, xact)) {
2660
/* We failed to clean (try again later)... */
2661
#ifdef TRACE_SWEEPER_ACTIVITY
2662
printf("SWEEPER: cleanup retry...\n", (int) xact->xd_start_xn_id);
2666
#ifdef TRACE_SWEEPER_ACTIVITY
2667
printf("SWEEPER: cleanup DONE\n", (int) xact->xd_start_xn_id);
2671
/* This was a read-only transaction, it is safe to
2672
* just remove the transaction structure from memory.
2673
* (should not be necessary because RO transactions
2674
* do this themselves):
2676
if (xt_xn_delete_xact(db, db->db_xn_to_clean_id, self)) {
2677
if (db->db_xn_min_ram_id == db->db_xn_to_clean_id)
2678
db->db_xn_min_ram_id = db->db_xn_to_clean_id+1;
2683
/* Move on to clean the next: */
2684
db->db_xn_to_clean_id++;
2690
xn_sw_close_open_table(self, ss);
2692
xn_sw_go_slower(self, db);
2694
/* Shrink the free list, if it is empty, and larger then
2697
if (ss->ss_to_free.bq_size > XT_TN_MAX_TO_FREE) {
2698
if (ss->ss_to_free.bq_front == 0 && ss->ss_to_free.bq_back == 0)
2699
xt_bq_set_size(self, &ss->ss_to_free, XT_TN_MAX_TO_FREE);
2702
/* Windows: close the log file that we have open for reading, if we
2703
* read past the end of the log on the last transaction.
2704
* This makes sure that the log is closed when the checkpointer
2705
* tries to remove or rename it!!
2707
if (ss->ss_seqread.xseq_log_file) {
2708
if (ss->ss_seqread.xseq_rec_log_id != ss->ss_seqread.xseq_log_id)
2709
db->db_xlog.xlog_seq_close(&ss->ss_seqread);
2712
if (ss->ss_flush_pending) {
2713
/* Flush pending means we have written something to the log.
2715
* if so we flush the log so that the writer will also do
2718
* This will lead to the freeer continuing if it is waiting.
2721
time_t now = time(NULL);
2723
/* By default, we wait for 2 seconds idle time, then
2726
if (now >= idle_start + 2) {
2727
/* Don't do this if flusher is active! */
2728
if (!db->db_fl_thread &&
2729
!xt_xlog_flush_log(db, self))
2731
ss->ss_flush_pending = FALSE;
2738
/* {WAKE-SW} Waking up the sweeper is very expensive!
2739
* Cost is 3% of execution time on the test:
2740
* runTest(SMALL_SELECT_TEST, 2, 100000)
2742
* On the other hand, polling every 1/10 second
2743
* is cheap, because the check for transactions
2744
* ready for cleanup is very quick.
2746
* So this is the prefered method.
2748
xn_sw_wait_for_xact(self, db, 10);
2751
if (ss->ss_flush_pending) {
2752
xt_xlog_flush_log(db, self);
2753
ss->ss_flush_pending = FALSE;
2756
freer_(); // xn_free_sw_state(ss)
2759
static void *xn_sw_run_thread(XTThreadPtr self)
2761
XTDatabaseHPtr db = (XTDatabaseHPtr) self->t_data;
2765
if (!(mysql_thread = myxt_create_thread()))
2768
while (!self->t_quit) {
2771
* The garbage collector requires that the database
2772
* is in use because.
2774
xt_use_database(self, db, XT_FOR_SWEEPER);
2776
/* {BACKGROUND-RELEASE-DB}
2777
* This action is both safe and required:
2779
* safe: releasing the database is safe because as
2780
* long as this thread is running the database
2781
* reference is valid, and this reference cannot
2782
* be the only one to the database because
2783
* otherwize this thread would not be running.
2785
* required: releasing the database is necessary
2786
* otherwise we cannot close the database
2787
* correctly because we only shutdown this
2788
* thread when the database is closed and we
2789
* only close the database when all references
2792
xt_heap_release(self, self->st_database);
2797
/* This error is "normal"! */
2798
if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
2799
!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
2800
self->t_exception.e_sys_err == SIGTERM))
2801
xt_log_and_clear_exception(self);
2805
/* Avoid releasing the database (done above) */
2806
self->st_database = NULL;
2807
xt_unuse_database(self, self);
2809
/* After an exception, pause before trying again... */
2810
/* Number of seconds */
2816
db->db_sw_idle = XT_THREAD_INERR;
2817
while (!self->t_quit && count > 0) {
2821
db->db_sw_idle = XT_THREAD_BUSY;
2825
* {MYSQL-THREAD-KILL}
2826
myxt_destroy_thread(mysql_thread, TRUE);
2831
static void xn_sw_free_thread(XTThreadPtr self, void *data)
2833
XTDatabaseHPtr db = (XTDatabaseHPtr) data;
2835
if (db->db_sw_thread) {
2836
xt_lock_mutex(self, &db->db_sw_lock);
2837
pushr_(xt_unlock_mutex, &db->db_sw_lock);
2838
db->db_sw_thread = NULL;
2839
freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2843
/* Wait for a transaction to quit: */
2844
static void xn_sw_wait_for_xact(XTThreadPtr self, XTDatabaseHPtr db, u_int hsecs)
2846
xt_lock_mutex(self, &db->db_sw_lock);
2847
pushr_(xt_unlock_mutex, &db->db_sw_lock);
2848
db->db_sw_idle = XT_THREAD_IDLE;
2849
if (!self->t_quit && !db->db_sw_faster)
2850
xt_timed_wait_cond(self, &db->db_sw_cond, &db->db_sw_lock, hsecs * 10);
2851
db->db_sw_idle = XT_THREAD_BUSY;
2852
db->db_sw_check_count++;
2853
freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2856
xtPublic void xt_start_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
2858
char name[PATH_MAX];
2860
sprintf(name, "SW-%s", xt_last_directory_of_path(db->db_main_path));
2861
xt_remove_dir_char(name);
2862
db->db_sw_thread = xt_create_daemon(self, name);
2863
xt_set_thread_data(db->db_sw_thread, db, xn_sw_free_thread);
2864
xt_run_thread(self, db->db_sw_thread, xn_sw_run_thread);
2867
xtPublic void xt_init_sweeper_wait(XTThreadPtr self, XTDatabaseHPtr db)
2869
xtXactID current_id;
2871
current_id = db->db_xn_curr_id;
2872
if (!xt_xn_is_before(current_id, db->db_xn_to_clean_id)) {
2873
double init_diff, curr_done = 0;
2875
xtBool print_progress = FALSE;
2876
int perc_to_print = 1;
2878
init_diff = (double) xt_xn_diff(current_id, db->db_xn_to_clean_id);
2879
start_time = time(NULL);
2881
xt_logf(XT_NT_INFO, "PBXT: Initial sweep, transactions to scan: %llu\n", (u_llong) init_diff);
2883
while (!xt_xn_is_before(current_id, db->db_xn_to_clean_id)) { // means: db->db_xn_to_clean_id <= current_id
2884
xt_lock_mutex(self, &db->db_sw_lock);
2885
pushr_(xt_unlock_mutex, &db->db_sw_lock);
2886
xt_wakeup_sweeper(db);
2887
freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2889
xt_sleep_milli_second(10);
2891
if (!print_progress) {
2892
if (time(NULL) - start_time > 2)
2893
print_progress = TRUE;
2896
if (print_progress) {
2897
curr_done = init_diff - (double) xt_xn_diff(current_id, db->db_xn_to_clean_id);
2898
while (perc_to_print <= (int) (curr_done / init_diff * 100)) {
2899
if (((perc_to_print - 1) % 25) == 0)
2900
xt_logf(XT_NT_INFO, "PBXT: ");
2901
if ((perc_to_print % 25) == 0)
2902
xt_logf(XT_NT_INFO, "%2d\n", perc_to_print);
2904
xt_logf(XT_NT_INFO, "%2d ", perc_to_print);
2911
if (print_progress) {
2912
while (perc_to_print <= 100) {
2913
if (((perc_to_print - 1) % 25) == 0)
2914
xt_logf(XT_NT_INFO, "PBXT: ");
2915
if ((perc_to_print % 25) == 0)
2916
xt_logf(XT_NT_INFO, "%2d\n", perc_to_print);
2918
xt_logf(XT_NT_INFO, "%2d ", perc_to_print);
2923
xt_logf(XT_NT_INFO, "PBXT: Initial sweep complete, transactions scanned: %llu\n", (u_llong) init_diff);
2927
xtPublic void xt_wait_for_sweeper(XTThreadPtr self, XTDatabaseHPtr db, int abort_time)
2930
xtBool message = FALSE;
2932
if (db->db_sw_thread) {
2934
/* Changed xt_xn_get_curr_id(db) to db->db_xn_curr_id,
2935
* This should work because we are not concerned about the difference
2936
* between xt_xn_get_curr_id(db) and db->db_xn_curr_id,
2937
* Which is just a matter of when transactions we can expect ot find
2938
* in memory (see {GAP-INC-ADD-XACT})
2940
while (!xt_xn_is_before(db->db_xn_curr_id, db->db_xn_to_clean_id)) { // was db->db_xn_to_clean_id <= xt_xn_get_curr_id(db)
2941
xt_lock_mutex(self, &db->db_sw_lock);
2942
pushr_(xt_unlock_mutex, &db->db_sw_lock);
2943
xt_wakeup_sweeper(db);
2944
freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2945
xt_sleep_milli_second(10);
2947
if (abort_time && now >= then + abort_time) {
2948
xt_logf(XT_NT_INFO, "Aborting wait for '%s' sweeper\n", db->db_name);
2952
if (now >= then + 2) {
2955
xt_logf(XT_NT_INFO, "Waiting for '%s' sweeper...\n", db->db_name);
2961
xt_logf(XT_NT_INFO, "Sweeper '%s' done.\n", db->db_name);
2965
xtPublic void xt_stop_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
2969
if (db->db_sw_thread) {
2970
xt_lock_mutex(self, &db->db_sw_lock);
2971
pushr_(xt_unlock_mutex, &db->db_sw_lock);
2973
/* This pointer is safe as long as you have the transaction lock. */
2974
if ((thr_sw = db->db_sw_thread)) {
2975
xtThreadID tid = thr_sw->t_id;
2977
/* Make sure the thread quits when woken up. */
2978
xt_terminate_thread(self, thr_sw);
2980
xt_wakeup_sweeper(db);
2982
freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2985
* GOTCHA: This is a wierd thing but the SIGTERM directed
2986
* at a particular thread (in this case the sweeper) was
2987
* being caught by a different thread and killing the server
2988
* sometimes. Disconcerting.
2989
* (this may only be a problem on Mac OS X)
2990
xt_kill_thread(thread);
2992
xt_wait_for_thread_to_exit(tid, FALSE);
2994
/* PMC - This should not be necessary to set the signal here, but in the
2995
* debugger the handler is not called!!?
2996
thr_sw->t_delayed_signal = SIGTERM;
2997
xt_kill_thread(thread);
2999
db->db_sw_thread = NULL;
3002
freer_(); // xt_unlock_mutex(&db->db_sw_lock)
3006
xtPublic void xt_wakeup_sweeper(XTDatabaseHPtr db)
3008
/* This flag makes the gap for the race condition
3011
* However, this posibility still remains because
3012
* we do not lock the mutex db_sw_lock here.
3014
* The reason is that it is too expensive.
3016
* In the event that the wakeup is missed the sleeper
3017
* wait will timeout eventually.
3019
if (db->db_sw_idle) {
3020
if (!xt_broadcast_cond_ns(&db->db_sw_cond))
3021
xt_log_and_clear_exception_ns();