1
/* Copyright (c) 2005 PrimeBase Technologies GmbH
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* 2005-04-10 Paul McCullagh
24
#include "xt_config.h"
33
#include "xaction_xt.h"
34
#include "database_xt.h"
35
#include "strutil_xt.h"
39
#include "tabcache_xt.h"
42
//#define TRACE_WAIT_FOR
43
//#define TRACE_VARIATIONS
44
//#define TRACE_SWEEPER_ACTIVITY
46
/* Enable to trace the statements executed by the engine: */
47
//#define TRACE_STATEMENTS
50
#if defined(TRACE_STATEMENTS) || defined(TRACE_VARIATIONS)
51
#define TRACE_TRANSACTION
54
static void xn_sw_wait_for_xact(XTThreadPtr self, XTDatabaseHPtr db, u_int hsecs);
55
static xtBool xn_get_xact_details(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread), int *flags, xtXactID *start, xtXactID *end, xtThreadID *thd_id);
56
static xtBool xn_get_xact_pointer(XTDatabaseHPtr db, xtXactID xn_id, XTXactDataPtr *xact_ptr);
58
/* ============================================================================================== */
60
typedef struct XNSWRecItem {
63
} XNSWRecItemRec, *XNSWRecItemPtr;
65
typedef struct XNSWToFreeItem {
66
xtTableID ri_tab_id; /* If non-zero, then this is the table of the data record to be freed.
67
* If zero, then this free the transaction below must be freed.
73
xtXactID ri_wait_xn_id; /* Wait for this transaction to be cleaned (or being cleaned up)
74
* before freeing this resource. */
75
} XNSWToFreeItemRec, *XNSWToFreeItemPtr;
77
/* ----------------------------------------------------------------------
78
* WAIT FOR TRANSACTIONS
81
typedef struct XNWaitFor {
82
xtXactID wf_waiting_xn_id; /* The transaction of the waiting thread. */
83
xtXactID wf_for_me_xn_id; /* The transaction we are waiting for. */
84
} XNWaitForRec, *XNWaitForPtr;
86
static int xn_compare_wait_for(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
88
xtXactID *x = (xtXactID *) a;
89
XNWaitForPtr y = (XNWaitForPtr) b;
91
if (*x == y->wf_waiting_xn_id)
93
if (xt_xn_is_before(*x, y->wf_waiting_xn_id))
98
static void xn_free_wait_for(XTThreadPtr XT_UNUSED(self), void *XT_UNUSED(thunk), void *XT_UNUSED(item))
103
* A deadlock occurs when a transaction is waiting for itself!
104
* For example A is waiting for B which is waiting for A.
105
* By repeatedly scanning the wait_for list we can find out if a
106
* transaction is waiting for itself.
108
static xtBool xn_detect_deadlock(XTDatabaseHPtr db, xtXactID waiting, xtXactID for_me)
113
if (waiting == for_me) {
114
#ifdef TRACE_WAIT_FOR
115
for (u_int i=0; i<xt_sl_get_size(db->db_xn_wait_for); i++) {
116
wf = (XNWaitForPtr) xt_sl_item_at(db->db_xn_wait_for, i);
117
xt_trace("T%lu --> T%lu\n", (u_long) wf->wf_waiting_xn_id, (u_long) wf->wf_for_me_xn_id);
119
xt_ttracef(xt_get_self(), "DEADLOCK\n");
122
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_DEADLOCK);
125
if (!(wf = (XNWaitForPtr) xt_sl_find(NULL, db->db_xn_wait_for, &for_me)))
127
for_me = wf->wf_for_me_xn_id;
132
#ifdef XT_USE_SPINLOCK_WAIT_FOR
134
#if defined(XT_MAC) || defined(XT_WIN)
135
#define WAIT_SPIN_COUNT 10
137
#define WAIT_SPIN_COUNT 50
140
/* Should not be required, but we wait for a second,
141
* just in case the wakeup is missed!
144
#define WAIT_FOR_XACT_TIME 30000
146
#define WAIT_FOR_XACT_TIME 1000
149
static xtBool xn_add_to_wait_for(XTDatabaseHPtr db, XNWaitForPtr wf, XTThreadPtr thread)
151
/* If we are waiting for a transaction to end,
152
* put this thread on the wait list...
154
* As long as the temporary lock is removed
155
* or turned into a permanent lock before
156
* a thread waits again, all should be OK!
158
xt_spinlock_lock(&db->db_xn_wait_spinlock);
160
#ifdef TRACE_WAIT_FOR
161
xt_ttracef(thread, "T%lu -wait-> T%lu\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) wait_xn_id);
163
/* Check for a deadlock: */
164
if (xn_detect_deadlock(db, wf->wf_waiting_xn_id, wf->wf_for_me_xn_id))
167
/* We will wait for this transaction... */
168
db->db_xn_wait_count++;
169
if (thread->st_xact_writer)
170
db->db_xn_writer_wait_count++;
172
if (!xt_sl_insert(NULL, db->db_xn_wait_for, &wf->wf_waiting_xn_id, wf)) {
173
db->db_xn_wait_count--;
177
xt_spinlock_unlock(&db->db_xn_wait_spinlock);
181
xt_spinlock_unlock(&db->db_xn_wait_spinlock);
185
inline void xn_remove_from_wait_for(XTDatabaseHPtr db, XNWaitForPtr wf, XTThreadPtr thread)
187
xt_spinlock_lock(&db->db_xn_wait_spinlock);
189
xt_sl_delete(NULL, db->db_xn_wait_for, &wf->wf_waiting_xn_id);
190
db->db_xn_wait_count--;
191
if (thread->st_xact_writer)
192
db->db_xn_writer_wait_count--;
194
#ifdef TRACE_WAIT_FOR
195
xt_ttracef(thread, "T%lu -wait-> T%lu FAILED\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) wait_xn_id);
197
xt_spinlock_unlock(&db->db_xn_wait_spinlock);
200
/* Wait for a transation to terminate or a lock to be granted.
202
* If term_req is TRUE, then the termination of the transaction is required
205
* If pw_func is set then this function will not return before this call has
208
* This function returns FAILE on error.
210
xtPublic xtBool xt_xn_wait_for_xact(XTThreadPtr thread, XTXactWaitPtr xw, XTLockWaitPtr lw)
212
XTDatabaseHPtr db = thread->st_database;
216
XTXactDataPtr wait_xact_ptr;
217
xtBool on_wait_list = FALSE;
218
XTXactWaitRec xw_new;
219
u_int loop_count = 0;
220
XTWaitThreadPtr my_wt;
222
ASSERT_NS(thread->st_xact_data);
223
thread->st_statistics.st_wait_for_xact++;
225
wf.wf_waiting_xn_id = thread->st_xact_data->xd_start_xn_id;
228
/* If we are here, then the lw structure is on the wait
229
* queue for the given lock.
231
xtXactID locking_xn_id;
234
locking_xn_id = lw->lw_xn_id;
235
wf.wf_for_me_xn_id = lw->lw_xn_id;
236
if (!xn_add_to_wait_for(db, &wf, thread)) {
237
lw->lw_ot->ot_table->tab_locks.xt_cancel_temp_lock(lw);
241
while (loop_count < WAIT_SPIN_COUNT) {
244
switch (lw->lw_curr_lock) {
246
xn_remove_from_wait_for(db, &wf, thread);
250
/* Check if we must also wait for the transaction: */
251
if (lw->lw_row_updated) {
252
/* This will override the xw passed in.
253
* The reason is, because we are actually waiting
254
* for a lock, and the lock owner may have changed
255
* while we were waiting for the lock.
257
xw_new.xw_xn_id = lw->lw_updating_xn_id;
261
if (wf.wf_for_me_xn_id == xw->xw_xn_id)
264
xn_remove_from_wait_for(db, &wf, thread);
267
xn_remove_from_wait_for(db, &wf, thread);
271
if (locking_xn_id != lw->lw_xn_id) {
272
/* Change the transaction that we are waiting for: */
273
xn_remove_from_wait_for(db, &wf, thread);
274
goto wait_for_locker;
283
/* The non-spinning version... */
284
wait_for_locker_no_spin:
285
THR_ARRAY_READ_LOCK(&xt_thr_array_resize_lock, thread->t_id);
286
my_wt = xt_thr_array[thread->t_id].td_waiting;
287
THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, thread->t_id);
288
xt_lock_mutex_ns(&my_wt->wt_lock);
291
switch (lw->lw_curr_lock) {
293
xt_unlock_mutex_ns(&my_wt->wt_lock);
294
xn_remove_from_wait_for(db, &wf, thread);
297
xt_unlock_mutex_ns(&my_wt->wt_lock);
298
if (lw->lw_row_updated) {
299
xw_new.xw_xn_id = lw->lw_updating_xn_id;
303
if (wf.wf_for_me_xn_id == xw->xw_xn_id)
306
xn_remove_from_wait_for(db, &wf, thread);
309
xn_remove_from_wait_for(db, &wf, thread);
313
if (locking_xn_id != lw->lw_xn_id) {
314
/* Change the transaction that we are waiting for: */
315
xt_unlock_mutex_ns(&my_wt->wt_lock);
316
xn_remove_from_wait_for(db, &wf, thread);
317
locking_xn_id = lw->lw_xn_id;
318
wf.wf_for_me_xn_id = lw->lw_xn_id;
319
if (!xn_add_to_wait_for(db, &wf, thread)) {
320
lw->lw_ot->ot_table->tab_locks.xt_cancel_temp_lock(lw);
323
goto wait_for_locker_no_spin;
328
xt_timed_wait_cond_ns(&my_wt->wt_cond, &my_wt->wt_lock, WAIT_FOR_XACT_TIME);
332
xt_unlock_mutex_ns(&my_wt->wt_lock);
337
xtThreadID tn_thd_id;
340
wf.wf_for_me_xn_id = xw->xw_xn_id;
342
if (!xn_get_xact_pointer(db, xw->xw_xn_id, &wait_xact_ptr))
343
/* The transaction was not found... */
347
/* This is a dirty read, but it should work! */
348
flags = wait_xact_ptr->xd_flags;
349
start = wait_xact_ptr->xd_start_xn_id;
350
tn_thd_id = wait_xact_ptr->xd_thread_id;
354
if (!xn_get_xact_details(db, xw->xw_xn_id, thread, &flags, &start, NULL, &tn_thd_id))
355
flags = XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
358
if ((flags & XT_XN_XAC_ENDED) || start != xw->xw_xn_id)
359
/* The transaction has terminated! */
362
/* Tell the thread we are waiting for it: */
363
xt_add_to_wakeup_list(thread->t_id, tn_thd_id);
366
if (!xn_add_to_wait_for(db, &wf, thread))
371
/* The spinning version: */
372
while (loop_count < WAIT_SPIN_COUNT) {
378
/* This is a dirty read, but it should work! */
379
flags = wait_xact_ptr->xd_flags;
380
start = wait_xact_ptr->xd_start_xn_id;
383
if (!xn_get_xact_details(db, xw->xw_xn_id, thread, &flags, &start, NULL, NULL))
384
flags = XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
387
if ((flags & XT_XN_XAC_ENDED) || start != xw->xw_xn_id)
388
/* The transaction has terminated! */
392
/* The non-spinning version:
394
* I believe I can avoid missing the wakeup signal
395
* by locking before we check if the transaction
398
* Even though db->db_xn_wait_on_cond is "dirty read".
400
* The reason is, before the signal is sent the
401
* lock is also aquired. This is not possible until
402
* this thread is safely sleaping.
404
THR_ARRAY_READ_LOCK(&xt_thr_array_resize_lock, thread->t_id);
405
my_wt = xt_thr_array[thread->t_id].td_waiting;
406
THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, thread->t_id);
408
xt_lock_mutex_ns(&my_wt->wt_lock);
412
/* This is a dirty read, but it should work! */
413
flags = wait_xact_ptr->xd_flags;
414
start = wait_xact_ptr->xd_start_xn_id;
417
if (!xn_get_xact_details(db, xw->xw_xn_id, thread, &flags, &start, NULL, NULL))
418
flags = XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
421
if ((flags & XT_XN_XAC_ENDED) || start != xw->xw_xn_id)
422
/* The transaction has terminated! */
425
xt_timed_wait_cond_ns(&my_wt->wt_cond, &my_wt->wt_lock, WAIT_FOR_XACT_TIME);
428
xt_unlock_mutex_ns(&my_wt->wt_lock);
432
xn_remove_from_wait_for(db, &wf, thread);
438
#else // XT_USE_SPINLOCK_WAIT_FOR
440
* The given thread must wait for the specified transaction to terminate. This
441
* function places the transaction of the thread on a list of waiting threads.
443
* Before waiting we make a check for deadlocks. A deadlock occurs
444
* if waiting would introduce a cycle.
446
xtPublic xtBool old_xt_xn_wait_for_xact(XTThreadPtr thread, xtXactID xn_id, xtBool will_retry, XTLockWaitFuncPtr pw_func, XTLockWaitPtr pw_data)
448
XTDatabaseHPtr db = thread->st_database;
453
ASSERT_NS(thread->st_xact_data);
455
thread->st_statistics.st_wait_for_xact++;
456
wf.wf_waiting_xn_id = thread->st_xact_data->xd_start_xn_id;
457
wf.wf_for_me_xn_id = xn_id;
458
wf.wf_thread_id = thread->t_id;
460
xt_lock_mutex_ns(&db->db_xn_wait_lock);
462
#ifdef TRACE_WAIT_FOR
463
xt_ttracef(thread, "T%lu -wait-> T%lu\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) xn_id);
466
if (!xn_get_xact_details(db, xn_id, thread, &flags, &start, NULL, NULL))
469
/* This is a dirty read, but it should work! */
470
if ((flags & XT_XN_XAC_ENDED) || start != xn_id)
473
if (xn_detect_deadlock(db, wf.wf_waiting_xn_id, wf.wf_for_me_xn_id))
476
/* We will wait for this transaction... */
477
db->db_xn_wait_count++;
478
if (thread->st_xact_writer)
479
db->db_xn_writer_wait_count++;
481
if (!xt_sl_insert(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id, &wf)) {
482
db->db_xn_wait_count--;
486
if (!xn_get_xact_details(db, xn_id, thread, &flags, &start, NULL, NULL)) {
487
xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
488
db->db_xn_wait_count--;
489
if (thread->st_xact_writer)
490
db->db_xn_writer_wait_count--;
494
if ((flags & XT_XN_XAC_ENDED) || start != xn_id) {
495
xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
496
db->db_xn_wait_count--;
497
if (thread->st_xact_writer)
498
db->db_xn_writer_wait_count--;
502
db->db_xn_post_wait[thread->t_id].pw_call_me = pw_func;
503
db->db_xn_post_wait[thread->t_id].pw_thread = thread;
504
db->db_xn_post_wait[thread->t_id].pw_data = pw_data;
506
/* Timed wait because it is possible that transaction quits before
509
if (!xt_timed_wait_cond(NULL, &db->db_xn_wait_cond, &db->db_xn_wait_lock, 2 * 1000)) {
510
xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
511
db->db_xn_wait_count--;
512
if (thread->st_xact_writer)
513
db->db_xn_writer_wait_count--;
517
db->db_xn_post_wait[thread->t_id].pw_call_me = NULL;
518
xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
519
db->db_xn_wait_count--;
520
if (thread->st_xact_writer)
521
db->db_xn_writer_wait_count--;
527
#ifdef TRACE_WAIT_FOR
528
xt_ttracef(thread, "T%lu -wait-> T%lu DONE\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) xn_id);
530
xt_unlock_mutex_ns(&db->db_xn_wait_lock);
534
#ifdef TRACE_WAIT_FOR
535
xt_ttracef(self, "T%lu -wait-> T%lu FAILED\n", (u_long) self->st_xact_data->xd_start_xn_id, (u_long) xn_id);
537
xt_unlock_mutex_ns(&db->db_xn_wait_lock);
541
xtPublic void old_xt_xn_wakeup_transactions(XTDatabaseHPtr db, XTThreadPtr thread)
546
xt_lock_mutex_ns(&db->db_xn_wait_lock);
547
/* The idea here is to release the oldest transactions
548
* first. Although this may not be completely fair
549
* it has the advantage that older transactions are
550
* encouraged to complete first.
552
* I have found the following problem with this test:
553
* runTest(INCREMENT_TEST, 16, INCREMENT_TEST_UPDATE_COUNT);
554
* with a bit of bad luck a transaction can be starved.
555
* This results in the sweeper stalling because it is
556
* waiting for an old transaction to quite so that
559
* Because the sweeper is waiting, the number of
560
* versions of the record to be updated
561
* begins to increase. In the above test over
562
* 1600 transaction remain uncleaned.
564
* This means that there are 1600 version of the
565
* row which must be scanned to find the most
568
if ((len = (u_int) xt_sl_get_size(db->db_xn_wait_for))) {
569
for (u_int i=0; i<len; i++) {
570
wf = (XNWaitForPtr) xt_sl_item_at(db->db_xn_wait_for, i);
571
if (db->db_xn_post_wait[wf->wf_thread_id].pw_call_me) {
572
if (db->db_xn_post_wait[wf->wf_thread_id].pw_call_me(thread, &db->db_xn_post_wait[wf->wf_thread_id]))
573
db->db_xn_post_wait[wf->wf_thread_id].pw_call_me = NULL;
576
if (!xt_broadcast_cond_ns(&db->db_xn_wait_cond))
577
xt_log_and_clear_exception_ns();
579
ASSERT_NS(db->db_xn_wait_count == len);
580
xt_unlock_mutex_ns(&db->db_xn_wait_lock);
582
#endif // XT_USE_SPINLOCK_WAIT_FOR
584
/* ----------------------------------------------------------------------
592
u_long not_clean_max;
596
static void xn_free_xact(XTDatabaseHPtr db, XTXactSegPtr seg, XTXactDataPtr xact)
601
/* This indicates the structure is free: */
602
xact->xd_start_xn_id = 0;
603
if ((xtWord1 *) xact >= db->db_xn_data && (xtWord1 *) xact < db->db_xn_data_end) {
604
/* Put it in the free list: */
605
xact->xd_next_xact = seg->xs_free_list;
606
seg->xs_free_list = xact;
613
* GOTCHA: The value db->db_xn_curr_id may be a bit larger
614
* than the actual transaction created because there is
615
* a gap between the issude of the transaction ID
616
* and the creation of a memory structure.
617
* (indicated here: {GAP-INC-ADD-XACT})
619
* This function returns the actuall current transaction ID.
620
* This is the number of the last transaction actually
623
* This means that if you call xt_xn_get_xact() with any
624
* number less than or equal to this value, not finding
625
* the transaction means it has already ended!
627
xtPublic xtXactID xt_xn_get_curr_id(XTDatabaseHPtr db)
631
register XTXactSegPtr seg = db->db_xn_idx;
633
/* Find the highest transaction ID actually created... */
634
curr_xn_id = seg->xs_last_xn_id;
636
for (i=1; i<XT_XN_NO_OF_SEGMENTS; i++, seg++) {
637
if (xt_xn_is_before(curr_xn_id, seg->xs_last_xn_id))
638
curr_xn_id = seg->xs_last_xn_id;
643
xtPublic XTXactDataPtr xt_xn_add_old_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr thread)
645
register XTXactDataPtr xact;
646
register XTXactSegPtr seg;
647
register XTXactDataPtr *hash;
650
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
651
XT_XACT_WRITE_LOCK(&seg->xs_tab_lock, thread);
652
hash = &seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
655
if (xact->xd_start_xn_id == xn_id)
657
xact = xact->xd_next_xact;
660
if ((xact = seg->xs_free_list))
661
seg->xs_free_list = xact->xd_next_xact;
663
/* We have used up all the free transaction slots,
664
* the sweeper should work faster to free them
667
db->db_sw_faster |= XT_SW_NO_MORE_XACT_SLOTS;
668
if (!(xact = (XTXactDataPtr) xt_malloc_ns(sizeof(XTXactDataRec)))) {
669
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
674
xact->xd_next_xact = *hash;
677
xact->xd_start_xn_id = xn_id;
678
xact->xd_end_xn_id = 0;
679
xact->xd_end_time = 0;
680
xact->xd_begin_log = 0;
683
/* Get the largest transaction id. */
684
if (xt_xn_is_before(seg->xs_last_xn_id, xn_id))
685
seg->xs_last_xn_id = xn_id;
688
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
691
if (tot_alloced > high_alloced)
692
high_alloced = tot_alloced;
697
static XTXactDataPtr xn_add_new_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr thread)
699
register XTXactDataPtr xact;
700
register XTXactSegPtr seg;
701
register XTXactDataPtr *hash;
704
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
705
XT_XACT_WRITE_LOCK(&seg->xs_tab_lock, thread);
706
hash = &seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
708
if ((xact = seg->xs_free_list))
709
seg->xs_free_list = xact->xd_next_xact;
711
/* We have used up all the free transaction slots,
712
* the sweeper should work faster to free them
715
db->db_sw_faster |= XT_SW_NO_MORE_XACT_SLOTS;
716
if (!(xact = (XTXactDataPtr) xt_malloc_ns(sizeof(XTXactDataRec)))) {
717
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
722
xact->xd_next_xact = *hash;
725
xact->xd_thread_id = thread->t_id;
726
xact->xd_start_xn_id = xn_id;
727
xact->xd_end_xn_id = 0;
728
xact->xd_end_time = 0;
729
xact->xd_begin_log = 0;
732
seg->xs_last_xn_id = xn_id;
733
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
736
if (tot_alloced > high_alloced)
737
high_alloced = tot_alloced;
742
static xtBool xn_get_xact_details(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread), int *flags, xtXactID *start, xtWord4 *end, xtThreadID *thd_id)
744
register XTXactSegPtr seg;
745
register XTXactDataPtr xact;
746
xtBool found = FALSE;
748
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
749
XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
750
xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
752
if (xact->xd_start_xn_id == xn_id) {
755
*flags = xact->xd_flags;
757
*start = xact->xd_start_xn_id;
759
*end = xact->xd_end_time;
761
*thd_id = xact->xd_thread_id;
764
xact = xact->xd_next_xact;
766
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
770
static xtBool xn_get_xact_pointer(XTDatabaseHPtr db, xtXactID xn_id, XTXactDataPtr *xact_ptr)
772
register XTXactSegPtr seg;
773
register XTXactDataPtr xact;
774
xtBool found = FALSE;
777
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
778
XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
779
xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
781
if (xact->xd_start_xn_id == xn_id) {
783
/* We only return pointers to transaction structures that are permanently
786
if ((xtWord1 *) xact >= db->db_xn_data && (xtWord1 *) xact < db->db_xn_data_end)
790
xact = xact->xd_next_xact;
792
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
797
* Note, this function only returns TRUE if the transaction
798
* still needs to be cleaned.
800
static xtBool xn_get_xact_start(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread), xtLogID *log_id, xtLogOffset *log_offset)
802
register XTXactSegPtr seg;
803
register XTXactDataPtr xact;
804
xtBool found = FALSE;
806
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
807
XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
808
xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
810
if (xact->xd_start_xn_id == xn_id) {
811
/* Consider only transactions that have not be cleaned! */
812
if (!(xact->xd_flags & XT_XN_XAC_CLEANED))
815
*log_id = xact->xd_begin_log;
816
*log_offset = xact->xd_begin_offset;
820
xact = xact->xd_next_xact;
822
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
826
/* NOTE: this function may only be used by the sweeper or the recovery process. */
827
xtPublic XTXactDataPtr xt_xn_get_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread))
829
register XTXactSegPtr seg;
830
register XTXactDataPtr xact;
832
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
833
XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
834
xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
836
if (xact->xd_start_xn_id == xn_id)
838
xact = xact->xd_next_xact;
840
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
845
* Delete a transaction, return TRUE if the transaction
848
xtPublic xtBool xt_xn_delete_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr thread)
850
XTXactDataPtr xact, pxact = NULL;
854
seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
855
XT_XACT_WRITE_LOCK(&seg->xs_tab_lock, thread);
856
xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
858
if (xact->xd_start_xn_id == xn_id) {
860
pxact->xd_next_xact = xact->xd_next_xact;
862
seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE] = xact->xd_next_xact;
863
xn_free_xact(db, seg, xact);
864
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
868
xact = xact->xd_next_xact;
870
XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
874
//#define DEBUG_RAM_LIST
875
#ifdef DEBUG_RAM_LIST
877
#define DEBUG_RAM_LIST_SIZE 80
879
int check_ram_init_count = 0;
880
xt_rwlock_type check_ram_lock;
881
xtXactID check_ram_trns[DEBUG_RAM_LIST_SIZE];
884
static void check_ram_init(void)
886
if (check_ram_init_count == 0)
887
xt_init_rwlock(NULL, &check_ram_lock);
888
check_ram_init_count++;
891
static void check_ram_free(void)
893
check_ram_init_count--;
894
if (check_ram_init_count == 0)
895
xt_free_rwlock(&check_ram_lock);
898
static void check_ram_min_id(XTDatabaseHPtr db)
902
xt_slock_rwlock_ns(&check_ram_lock);
903
for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
904
if (check_ram_trns[i] && xt_xn_is_before(check_ram_trns[i], db->db_xn_min_ram_id)) {
905
/* This should never happen! */
909
for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
910
if (check_ram_trns[i]) {
911
x_ptr = xt_xn_get_xact(db, check_ram_trns[i]);
918
xt_unlock_rwlock_ns(&check_ram_lock);
921
static void check_ram_add(xtXactID xn_id)
925
xt_xlock_rwlock_ns(&check_ram_lock);
926
for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
927
if (!check_ram_trns[i]) {
928
check_ram_trns[i] = xn_id;
929
xt_unlock_rwlock_ns(&check_ram_lock);
933
xt_unlock_rwlock_ns(&check_ram_lock);
934
printf("DEBUG --- List too small\n");
937
static void check_ram_del(xtXactID xn_id)
941
xt_xlock_rwlock_ns(&check_ram_lock);
942
for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
943
if (check_ram_trns[i] == xn_id) {
944
check_ram_trns[i] = 0;
945
xt_unlock_rwlock_ns(&check_ram_lock);
949
xt_unlock_rwlock_ns(&check_ram_lock);
953
/* ----------------------------------------------------------------------
957
xtPublic void xt_xn_init_db(XTThreadPtr self, XTDatabaseHPtr db)
962
#ifdef DEBUG_RAM_LIST
965
xt_spinlock_init_with_autoname(self, &db->db_xn_id_lock);
966
xt_spinlock_init_with_autoname(self, &db->db_xn_wait_spinlock);
967
xt_init_mutex_with_autoname(self, &db->db_xn_xa_lock);
968
//xt_init_mutex_with_autoname(self, &db->db_xn_wait_lock);
969
//xt_init_cond(self, &db->db_xn_wait_cond);
970
xt_init_mutex_with_autoname(self, &db->db_sw_lock);
971
xt_init_cond(self, &db->db_sw_cond);
972
xt_init_mutex_with_autoname(self, &db->db_wr_lock);
973
xt_init_cond(self, &db->db_wr_cond);
975
/* Pre-allocate transaction data structures: */
976
db->db_xn_data = (xtWord1 *) xt_malloc(self, sizeof(XTXactDataRec) * XT_XN_DATA_ALLOC_COUNT * XT_XN_NO_OF_SEGMENTS);
977
db->db_xn_data_end = db->db_xn_data + sizeof(XTXactDataRec) * XT_XN_DATA_ALLOC_COUNT * XT_XN_NO_OF_SEGMENTS;
978
xact = (XTXactDataPtr) db->db_xn_data;
979
for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
980
seg = &db->db_xn_idx[i];
981
XT_XACT_INIT_LOCK(self, &seg->xs_tab_lock);
982
for (u_int j=0; j<XT_XN_DATA_ALLOC_COUNT; j++) {
983
xact->xd_next_xact = seg->xs_free_list;
984
seg->xs_free_list = xact;
989
/* Create a sorted list for XA transactions recovered: */
990
db->db_xn_xa_list = xt_new_sortedlist(self, sizeof(XTXactXARec), 100, 50, xt_xn_xa_compare, db, NULL, FALSE, FALSE);
992
/* Initialize the data logs: */
993
db->db_datalogs.dlc_init(self, db);
995
/* Setup the transaction log: */
996
db->db_xlog.xlog_setup(self, db, (off_t) xt_db_log_file_threshold, xt_db_transaction_buffer_size, xt_db_log_file_count);
998
db->db_xn_end_time = 1;
1000
/* Initializing the restart file, also does
1001
* recovery. This returns the log position after recovery.
1003
* This is the log position where the writer thread will
1004
* begin. The writer thread writes changes to the database that
1005
* have been flushed to the log.
1007
xt_xres_init(self, db);
1009
/* Initialize the "last transaction in memory", by default
1010
* this is the current transaction ID, which is the ID
1011
* of the last transaction.
1013
for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
1014
seg = &db->db_xn_idx[i];
1015
XT_XACT_INIT_LOCK(self, &seg->xs_tab_lock);
1016
seg->xs_last_xn_id = db->db_xn_curr_id;
1020
* The next transaction to clean is the lowest transaction
1023
db->db_xn_to_clean_id = db->db_xn_min_ram_id;
1024
#ifdef XT_SWEEPER_SORT_XACTS
1025
db->db_sw_to_add = db->db_xn_min_ram_id;
1029
* No transactions are running, so the minimum transaction
1030
* ID is the next one to run:
1032
db->db_xn_min_run_id = db->db_xn_curr_id + 1;
1034
db->db_xn_wait_for = xt_new_sortedlist(self, sizeof(XNWaitForRec), 100, 50, xn_compare_wait_for, db, xn_free_wait_for, FALSE, FALSE);
1037
xtPublic void xt_xn_exit_db(XTThreadPtr self, XTDatabaseHPtr db)
1040
printf("=========> MOST TXs CURR ALLOC: %lu\n", tot_alloced);
1041
printf("=========> MOST TXs HIGH ALLOC: %lu\n", high_alloced);
1042
printf("=========> MAX TXs NOT CLEAN: %lu\n", not_clean_max);
1043
printf("=========> MAX TXs IN RAM: %lu\n", in_ram_max);
1045
XTXactPreparePtr xap, xap_next;
1047
xt_stop_sweeper(self, db); // Should be done already!
1048
xt_stop_writer(self, db); // Should be done already!
1050
xt_xres_exit(self, db);
1051
db->db_xlog.xlog_exit(self);
1053
db->db_datalogs.dlc_exit(self);
1055
for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
1058
seg = &db->db_xn_idx[i];
1059
for (u_int j=0; j<XT_XN_HASH_TABLE_SIZE; j++) {
1060
XTXactDataPtr xact, nxact;
1062
xact = seg->xs_table[j];
1064
nxact = xact->xd_next_xact;
1065
xn_free_xact(db, seg, xact);
1069
XT_XACT_FREE_LOCK(self, &seg->xs_tab_lock);
1071
if (db->db_xn_wait_for) {
1072
xt_free_sortedlist(self, db->db_xn_wait_for);
1073
db->db_xn_wait_for = NULL;
1075
if (db->db_xn_data) {
1076
xt_free(self, db->db_xn_data);
1077
db->db_xn_data = NULL;
1078
db->db_xn_data_end = NULL;
1081
xt_free_cond(&db->db_wr_cond);
1082
xt_free_mutex(&db->db_wr_lock);
1083
xt_free_cond(&db->db_sw_cond);
1084
xt_free_mutex(&db->db_sw_lock);
1085
//xt_free_cond(&db->db_xn_wait_cond);
1086
//xt_free_mutex(&db->db_xn_wait_lock);
1087
xt_free_mutex(&db->db_xn_xa_lock);
1088
for (u_int i=0; i<XT_XA_HASH_TAB_SIZE; i++) {
1089
xap = db->db_xn_xa_table[i];
1091
xap_next = xap->xp_next;
1096
if (db->db_xn_xa_list) {
1097
xt_free_sortedlist(self, db->db_xn_xa_list);
1098
db->db_xn_xa_list = NULL;
1100
xt_spinlock_free(self, &db->db_xn_wait_spinlock);
1101
xt_spinlock_free(self, &db->db_xn_id_lock);
1102
#ifdef DEBUG_RAM_LIST
1107
xtPublic void xt_xn_init_thread(XTThreadPtr self, int what_for)
1109
ASSERT(self->st_database);
1111
if (!xt_init_row_lock_list(&self->st_lock_list))
1114
case XT_FOR_COMPACTOR:
1115
self->st_dlog_buf.dlb_init(self->st_database, xt_db_log_buffer_size);
1118
/* The writer does not need a transaction buffer. */
1119
self->st_dlog_buf.dlb_init(self->st_database, 0);
1121
case XT_FOR_SWEEPER:
1123
self->st_dlog_buf.dlb_init(self->st_database, 0);
1126
self->st_dlog_buf.dlb_init(self->st_database, xt_db_log_buffer_size);
1131
xtPublic void xt_xn_exit_thread(XTThreadPtr self)
1133
if (self->st_xact_data)
1134
xt_xn_rollback(self);
1135
self->st_dlog_buf.dlb_exit(self);
1136
xt_exit_row_lock_list(&self->st_lock_list);
1139
/* ----------------------------------------------------------------------
1140
* Begin and End Transactions
1143
xtPublic xtBool xt_xn_begin(XTThreadPtr self)
1145
XTDatabaseHPtr db = self->st_database;
1148
ASSERT(!self->st_xact_data);
1150
xt_spinlock_lock(&db->db_xn_id_lock);
1151
xn_id = ++db->db_xn_curr_id;
1152
xt_spinlock_unlock(&db->db_xn_id_lock);
1155
if (xt_xn_is_before(not_clean_max, xn_id - db->db_xn_to_clean_id))
1156
not_clean_max = xn_id - db->db_xn_to_clean_id;
1157
if (xt_xn_is_before(in_ram_max, xn_id - db->db_xn_min_ram_id))
1158
in_ram_max = xn_id - db->db_xn_min_ram_id;
1160
/* {GAP-INC-ADD-XACT} This is the gap between incrementing the ID,
1161
* and creating the transaction in memory.
1162
* See xt_xn_get_curr_id().
1165
if (!(self->st_xact_data = xn_add_new_xact(db, xn_id, self)))
1167
self->st_xact_writer = FALSE;
1169
/* All transactions that committed before or at this time
1170
* are this one are visible: */
1171
self->st_visible_time = db->db_xn_end_time;
1173
#ifdef TRACE_TRANSACTION
1174
xt_ttracef(self, "BEGIN T%lu\n", (u_long) self->st_xact_data->xd_start_xn_id);
1176
#ifdef XT_TRACK_CONNECTIONS
1177
xt_track_conn_info[self->t_id].ci_curr_xact_id = self->st_xact_data->xd_start_xn_id;
1178
xt_track_conn_info[self->t_id].ci_xact_start = xt_trace_clock();
1183
static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
1188
ASSERT_NS(thread->st_xact_data);
1189
if ((xact = thread->st_xact_data)) {
1190
XTDatabaseHPtr db = thread->st_database;
1191
xtXactID xn_id = xact->xd_start_xn_id;
1194
if ((writer = thread->st_xact_writer)) {
1195
/* The transaction wrote something: */
1196
XTXactEndEntryDRec entry;
1199
sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
1200
entry.xe_status_1 = status;
1201
entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
1202
XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
1203
XT_SET_DISK_4(entry.xe_not_used_4, 0);
1205
#ifdef XT_IMPLEMENT_NO_ACTION
1206
/* This will check any resticts that have been delayed to the end of the statement. */
1207
if (thread->st_restrict_list.bl_count) {
1208
if (!xt_tab_restrict_rows(&thread->st_restrict_list, thread)) {
1210
status = XT_LOG_ENT_ABORT;
1215
/* Flush the data log: */
1216
if (!thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
1218
status = XT_LOG_ENT_ABORT;
1221
/* Write and flush the transaction log:
1222
* We only flush if this was not a temp table.
1224
if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, thread->st_non_temp_opened ? XT_XLOG_NO_WRITE_NO_FLUSH : xt_db_flush_log_at_trx_commit)) {
1226
status = XT_LOG_ENT_ABORT;
1227
/* Make sure this is done, if we failed to log
1228
* the transction end!
1230
if (thread->st_xact_writer) {
1231
/* Adjust this in case of error, but don't forget
1234
xt_spinlock_lock(&db->db_xlog.xl_buffer_lock);
1235
db->db_xn_writer_count--;
1236
thread->st_xact_writer = FALSE;
1237
if (thread->st_xact_long_running) {
1238
db->db_xn_long_running_count--;
1239
thread->st_xact_long_running = FALSE;
1241
xt_spinlock_unlock(&db->db_xlog.xl_buffer_lock);
1245
/* Setting this flag completes the transaction,
1246
* Do this before we release the locks, because
1247
* the unlocked transactions expect the
1248
* transaction they are waiting for to be
1251
xact->xd_end_time = ++db->db_xn_end_time;
1252
if (status == XT_LOG_ENT_COMMIT) {
1253
thread->st_statistics.st_commits++;
1254
xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
1257
thread->st_statistics.st_rollbacks++;
1258
xact->xd_flags |= XT_XN_XAC_ENDED;
1261
/* {REMOVE-LOCKS} Drop locks is you have any: */
1262
thread->st_lock_list.xt_remove_all_locks(db, thread);
1264
/* Do this afterwards to make sure the sweeper
1265
* does not cleanup transactions start cleaning up
1266
* before any transactions that were waiting for
1267
* this transaction have completed!
1269
xact->xd_end_xn_id = db->db_xn_curr_id;
1271
/* Now you can sweep! */
1272
ASSERT_NS(xact->xd_flags & XT_XN_XAC_LOGGED);
1273
xact->xd_flags |= XT_XN_XAC_SWEEP;
1276
/* Read-only transaction can be removed, immediately */
1277
xact->xd_end_time = ++db->db_xn_end_time;
1278
xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
1280
/* Drop locks is you have any: */
1281
thread->st_lock_list.xt_remove_all_locks(db, thread);
1283
xact->xd_end_xn_id = db->db_xn_curr_id;
1285
ASSERT_NS(!(xact->xd_flags & XT_XN_XAC_LOGGED));
1286
xact->xd_flags |= XT_XN_XAC_SWEEP;
1288
if (xt_xn_delete_xact(db, xn_id, thread)) {
1289
if (db->db_xn_min_ram_id == xn_id)
1290
db->db_xn_min_ram_id = xn_id+1;
1294
#ifdef TRACE_TRANSACTION
1295
if (status == XT_LOG_ENT_COMMIT)
1296
xt_ttracef(thread, "COMMIT T%lu\n", (u_long) xn_id);
1298
xt_ttracef(thread, "ABORT T%lu\n", (u_long) xn_id);
1301
if (db->db_xn_min_run_id == xn_id)
1302
db->db_xn_min_run_id = xn_id+1;
1304
thread->st_xact_data = NULL;
1306
#ifdef XT_TRACK_CONNECTIONS
1307
xt_track_conn_info[thread->t_id].ci_prev_xact_id = xt_track_conn_info[thread->t_id].ci_curr_xact_id;
1308
xt_track_conn_info[thread->t_id].ci_prev_xact_time = xt_trace_clock() - xt_track_conn_info[thread->t_id].ci_xact_start;
1309
xt_track_conn_info[thread->t_id].ci_curr_xact_id = 0;
1310
xt_track_conn_info[thread->t_id].ci_xact_start = 0;
1313
xt_wakeup_waiting_threads(thread);
1315
/* {WAKE-SW} Waking the sweeper
1316
* is no longer unconditional.
1317
* (see all comments to {WAKE-SW})
1319
* We now wake the sweeper if it is
1320
* supposed to work faster.
1322
* There are now 2 cases:
1323
* - We run out of transaction slots.
1324
* - We encounter old index entries.
1326
* The following test:
1327
* runTest(INCREMENT_TEST, 16, INCREMENT_TEST_UPDATE_COUNT);
1328
* has extreme problems with sweeping every 1/10s
1329
* because a huge number of index entries accumulate
1330
* that need to be cleaned.
1332
* New code detects this case.
1334
if (db->db_sw_faster)
1335
xt_wakeup_sweeper(db);
1337
/* Don't get too far ahead of the sweeper! */
1339
#ifdef XT_WAIT_FOR_CLEANUP
1340
xtXactID wait_xn_id;
1342
/* This is the transaction that was committed 3 transactions ago: */
1343
wait_xn_id = thread->st_prev_xact[thread->st_last_xact];
1344
thread->st_prev_xact[thread->st_last_xact] = xn_id;
1345
/* This works because XT_MAX_XACT_BEHIND == 2! */
1346
ASSERT_NS((thread->st_last_xact + 1) % XT_MAX_XACT_BEHIND == (thread->st_last_xact ^ 1));
1347
thread->st_last_xact ^= 1;
1348
while (xt_xn_is_before(db->db_xn_to_clean_id, wait_xn_id) && (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND)) {
1349
#ifdef XT_SWEEPER_SORT_XACTS
1350
if (!xn_get_xact_start(db, wait_xn_id, thread, NULL, NULL))
1356
if ((db->db_sw_faster & XT_SW_TOO_FAR_BEHIND) != 0) {
1357
xtWord8 then = xt_trace_clock() + (xtWord8) 20000000;
1361
if (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND)
1363
if (xt_trace_clock() >= then)
1373
xtPublic xtBool xt_xn_commit(XTThreadPtr thread)
1375
return xn_end_xact(thread, XT_LOG_ENT_COMMIT);
1378
xtPublic xtBool xt_xn_rollback(XTThreadPtr thread)
1380
return xn_end_xact(thread, XT_LOG_ENT_ABORT);
1383
xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id)
1385
XTXactNewTabEntryDRec entry;
1387
entry.xt_status_1 = XT_LOG_ENT_NEW_TAB;
1388
entry.xt_checksum_1 = XT_CHECKSUM_1(tab_id);
1389
XT_SET_DISK_4(entry.xt_tab_id_4, tab_id);
1390
return xt_xlog_log_data(self, sizeof(XTXactNewTabEntryDRec), (XTXactLogBufferDPtr) &entry, XT_XLOG_WRITE_AND_FLUSH);
1393
xtPublic int xt_xn_status(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID XT_UNUSED(rec_id))
1395
register XTThreadPtr self = ot->ot_thread;
1400
/* Conditional waste of time!
1401
* Drizzle has strict warnings.
1402
* I know this is not necessary!
1407
if (xn_id == self->st_xact_data->xd_start_xn_id)
1408
return XT_XN_MY_UPDATE;
1409
if (xt_xn_is_before(xn_id, self->st_database->db_xn_min_ram_id) ||
1410
!xn_get_xact_details(self->st_database, xn_id, ot->ot_thread, &flags, NULL, &end, NULL)) {
1411
/* Not in RAM, rollback done: */
1412
//*DBG*/xt_dump_xlogs(self->st_database, 0);
1413
//*DBG*/xt_check_table(self, ot);
1414
//*DBG*/xt_dump_trace();
1415
/* {XACT-NOT-IN-RAM}
1416
* This should never happen (CHANGED see below)!
1418
* Because if the transaction is no longer in RAM, then it has been
1419
* cleaned up. So the record should be marked as clean, or not
1422
* After sweeping, we wait for all transactions to quit that were
1423
* running at the time of cleanup before removing the transaction record.
1424
* (see {XACT-NOT-IN-RAM})
1426
* If this was not the case, then we could be here because:
1427
* - The user transaction (T2) reads record x and notes that the record
1428
* has not been cleaned (CLEAN bit not set).
1430
* - The sweeper is busy sweeping the transaction (T1) that created
1432
* The SW sets the CLEAN bit on record x, and the schedules T1 for
1435
* Now T1 should not be deleted before T2 quits. If it does happen
1436
* then we land up here.
1438
* THIS CAN NOW HAPPEN!
1440
* First of all, a MYSTERY:
1441
* This did happen, dispite the description above! The reason why
1442
* is left as an exercise to the reader (really, I don't now why!)
1444
* This has force me to add code to handle the situation. This
1445
* is done by re-reading the record that is being checked by this
1446
* function. After re-reading, the record should either be
1447
* invalid (free) or clean (CLEAN bit set).
1449
* If this is the case, then we will not run land up here
1452
* Because we are only here because the record was valid but not
1453
* clean (you can confirm this by looking at the code that
1454
* calls this function).
1456
return XT_XN_REREAD;
1458
if (!(flags & XT_XN_XAC_ENDED))
1459
/* Transaction not ended, may be visible. */
1460
return XT_XN_OTHER_UPDATE;
1461
/* Visible if the transaction was committed: */
1462
if (flags & XT_XN_XAC_COMMITTED) {
1463
if (!xt_xn_is_before(self->st_visible_time, end)) // was self->st_visible_time >= xact->xd_end_time
1464
return XT_XN_VISIBLE;
1465
return XT_XN_NOT_VISIBLE;
1467
return XT_XN_ABORTED;
1470
/* ----------------------------------------------------------------------
1474
xtPublic int xt_xn_xa_compare(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
1476
xtXactID *x = (xtXactID *) a;
1477
XTXactXAPtr y = (XTXactXAPtr) b;
1479
if (*x == y->xx_xact_id)
1481
if (xt_xn_is_before(*x, y->xx_xact_id))
1486
xtPublic xtBool xt_xn_prepare(int len, xtWord1 *xa_data, XTThreadPtr thread)
1490
ASSERT_NS(thread->st_xact_data);
1491
if ((xact = thread->st_xact_data)) {
1492
xtXactID xn_id = xact->xd_start_xn_id;
1494
/* Only makes sense if the transaction has already been logged: */
1495
if ((thread->st_xact_data->xd_flags & XT_XN_XAC_LOGGED)) {
1496
if (!xt_xlog_modify_table(0, XT_LOG_ENT_PREPARE, xn_id, 0, 0, 0, len, xa_data, thread))
1503
xtPublic xtBool xt_xn_store_xa_data(XTDatabaseHPtr db, xtXactID xact_id, int len, xtWord1 *xa_data, XTThreadPtr XT_UNUSED(thread))
1505
XTXactPreparePtr xap;
1509
if (!(xap = (XTXactPreparePtr) xt_malloc_ns(offsetof(XTXactPrepareRec, xp_xa_data) + len)))
1511
xap->xp_xact_id = xact_id;
1512
xap->xp_hash = xt_get_checksum4(xa_data, len);
1513
xap->xp_data_len = len;
1514
memcpy(xap->xp_xa_data, xa_data, len);
1515
xx.xx_xact_id = xact_id;
1518
idx = xap->xp_hash % XT_XA_HASH_TAB_SIZE;
1519
xt_lock_mutex_ns(&db->db_xn_xa_lock);
1520
if (!xt_sl_insert(NULL, db->db_xn_xa_list, &xact_id, &xx)) {
1521
xt_unlock_mutex_ns(&db->db_xn_xa_lock);
1524
xap->xp_next = db->db_xn_xa_table[idx];
1525
db->db_xn_xa_table[idx] = xap;
1526
xt_unlock_mutex_ns(&db->db_xn_xa_lock);
1530
xtPublic void xt_xn_delete_xa_data_by_xact(XTDatabaseHPtr db, xtXactID xact_id, XTThreadPtr thread)
1534
xt_lock_mutex_ns(&db->db_xn_xa_lock);
1535
if (!(xx = (XTXactXAPtr) xt_sl_find(NULL, db->db_xn_xa_list, &xact_id)))
1537
xt_xn_delete_xa_data(db, xx->xx_xa_ptr, TRUE, thread);
1540
xtPublic void xt_xn_delete_xa_data(XTDatabaseHPtr db, XTXactPreparePtr xap, xtBool unlock, XTThreadPtr XT_UNUSED(thread))
1543
XTXactPreparePtr xap_ptr, xap_pptr = NULL;
1545
xt_sl_delete(NULL, db->db_xn_xa_list, &xap->xp_xact_id);
1546
idx = xap->xp_hash % XT_XA_HASH_TAB_SIZE;
1547
xap_ptr = db->db_xn_xa_table[idx];
1552
xap_ptr = xap_ptr->xp_next;
1556
xap_pptr->xp_next = xap_ptr->xp_next;
1558
db->db_xn_xa_table[idx] = xap_ptr->xp_next;
1562
xt_unlock_mutex_ns(&db->db_xn_xa_lock);
1565
xtPublic XTXactPreparePtr xt_xn_find_xa_data(XTDatabaseHPtr db, int len, xtWord1 *xa_data, xtBool lock, XTThreadPtr XT_UNUSED(thread))
1568
XTXactPreparePtr xap;
1572
xt_lock_mutex_ns(&db->db_xn_xa_lock);
1573
hash = xt_get_checksum4(xa_data, len);
1574
idx = hash % XT_XA_HASH_TAB_SIZE;
1575
xap = db->db_xn_xa_table[idx];
1577
if (xap->xp_hash == hash &&
1578
xap->xp_data_len == len &&
1579
memcmp(xap->xp_xa_data, xa_data, len) == 0) {
1588
xtPublic XTXactPreparePtr xt_xn_enum_xa_data(XTDatabaseHPtr db, XTXactEnumXAPtr exa)
1592
if (!exa->exa_locked) {
1593
xt_lock_mutex_ns(&db->db_xn_xa_lock);
1594
exa->exa_locked = TRUE;
1597
if ((xx = (XTXactXAPtr) xt_sl_item_at(db->db_xn_xa_list, exa->exa_index))) {
1599
return xx->xx_xa_ptr;
1602
if (exa->exa_locked) {
1603
exa->exa_locked = FALSE;
1604
xt_unlock_mutex_ns(&db->db_xn_xa_lock);
1609
/* ----------------------------------------------------------------------
1610
* S W E E P E R F U N C T I O N S
1613
xtPublic xtWord8 xt_xn_bytes_to_sweep(XTDatabaseHPtr db, XTThreadPtr thread)
1616
xtXactID curr_xn_id;
1617
xtLogID xn_log_id = 0;
1618
xtLogOffset xn_log_offset = 0;
1619
xtLogID x_log_id = 0;
1620
xtLogOffset x_log_offset = 0;
1622
xtLogOffset log_offset;
1623
xtWord8 byte_count = 0;
1625
xn_id = db->db_xn_to_clean_id;
1626
curr_xn_id = xt_xn_get_curr_id(db);
1627
// Limit the number of transactions checked!
1628
for (int i=0; i<1000; i++) {
1629
if (xt_xn_is_before(curr_xn_id, xn_id))
1631
if (xn_get_xact_start(db, xn_id, thread, &x_log_id, &x_log_offset)) {
1633
if (xt_comp_log_pos(x_log_id, x_log_offset, xn_log_id, xn_log_offset) < 0) {
1634
xn_log_id = x_log_id;
1635
xn_log_offset = x_log_offset;
1639
xn_log_id = x_log_id;
1640
x_log_offset = x_log_offset;
1648
/* Assume the logs have the threshold: */
1649
log_id = db->db_xlog.xl_write_log_id;
1650
log_offset = db->db_xlog.xl_write_log_offset;
1651
if (xn_log_id < log_id) {
1652
if (xn_log_offset < xt_db_log_file_threshold)
1653
byte_count = (size_t) (xt_db_log_file_threshold - xn_log_offset);
1657
while (xn_log_id < log_id) {
1658
byte_count += (size_t) xt_db_log_file_threshold;
1661
if (xn_log_offset < log_offset)
1662
byte_count += (size_t) (log_offset - xn_log_offset);
1667
/* ----------------------------------------------------------------------
1668
* S W E E P E R P R O C E S S
1671
typedef struct XNSweeperState {
1672
XTDatabaseHPtr ss_db;
1673
XTXactSeqReadRec ss_seqread;
1674
XTDataBufferRec ss_databuf;
1676
XTBasicQueueRec ss_to_free;
1677
xtBool ss_flush_pending;
1678
xtTableID ss_not_found; /* Cache the last table not found, this saves time. */
1679
xtTableID ss_not_recovered; /* Cache the last table not recovered. */
1680
XTOpenTablePtr ss_ot;
1681
} XNSweeperStateRec, *XNSweeperStatePtr;
1684
* This function NULL if the table cannot be opened.
1685
* In this case cleanup_done will be set to TRUE
1686
* if the cleanup should be skipped.
1689
static XTOpenTablePtr xn_sw_get_open_table(XTThreadPtr self, XNSweeperStatePtr ss, xtTableID tab_id, xtBool *skip_cleanup)
1692
if (ss->ss_ot->ot_table->tab_id == tab_id)
1695
xt_db_return_table_to_pool(self, ss->ss_ot);
1699
if (ss->ss_not_found == tab_id || ss->ss_not_recovered == tab_id) {
1700
*skip_cleanup = TRUE;
1707
if (!(ss->ss_ot = xt_db_open_pool_table(self, ss->ss_db, tab_id, &r, TRUE))) {
1709
case XT_TAB_NOT_FOUND:
1710
/* Remember the table if it was not found: */
1711
ss->ss_not_found = tab_id;
1712
*skip_cleanup = TRUE;
1714
case XT_TAB_NO_DICTIONARY:
1715
case XT_TAB_POOL_CLOSED:
1716
*skip_cleanup = FALSE;
1719
*skip_cleanup = TRUE;
1725
/* Don't sweep transactions for table that have not been
1728
if (ss->ss_ot->ot_table->tab_recovery_not_done) {
1729
xt_db_return_table_to_pool(self, ss->ss_ot);
1731
ss->ss_not_recovered = tab_id;
1732
*skip_cleanup = TRUE;
1740
static void xn_sw_close_open_table(XTThreadPtr self, XNSweeperStatePtr ss)
1743
xt_db_return_table_to_pool(self, ss->ss_ot);
1749
* A thread can set a bit in db_sw_faster to make
1750
* the sweeper go faster.
1752
static void xn_sw_could_go_faster(XTThreadPtr self, XTDatabaseHPtr db)
1754
if (db->db_sw_faster) {
1755
if (!db->db_sw_fast) {
1756
xt_set_priority(self, xt_db_sweeper_priority+1);
1757
db->db_sw_fast = TRUE;
1762
static void xn_sw_go_slower(XTThreadPtr self, XTDatabaseHPtr db)
1764
if (db->db_sw_fast) {
1765
xt_set_priority(self, xt_db_sweeper_priority);
1766
db->db_sw_fast = FALSE;
1768
db->db_sw_faster = XT_SW_WORK_NORMAL;
1771
/* Add a record to the "to free" queue. We note the current
1772
* transaction at the time this is done. The record will
1773
* only be freed once this transaction terminated, together
1774
* with all transactions that started before it!
1776
* The reason for this is that a sequential scan or some
1777
* other operation may read a committed record which is no longer
1778
* valid because it is no longer the latest variation (the first
1779
* variation reachable from the row pointer).
1781
* In this case, the sweeper will free the variation.
1782
* If the variation is re-used and committed before
1783
* the sequential scan or read completes, and by some
1784
* fluke is used by the same record as previously,
1785
* the system will think the record is valid
1788
* Without re-reading the record the sequential
1789
* scan or other read will find it on the variation list, and
1790
* return the record data as if valid!
1792
* ------------ 2008-01-03
1794
* An example of this is:
1796
* Assume we have 3 records.
1797
* The 3rd record is deleted, and committed.
1798
* Before cleanup can be performed
1799
* a sequential scan takes a copy of the records.
1801
* Now assume a new insert is done before
1802
* the sequential scan gets to the 3rd record.
1804
* The insert allocates the 3rd row and 3rd record
1807
* Now, when the sequential scan gets to the old copy of the 3rd record,
1808
* this is valid because the row points to this record again.
1810
* HOWEVER! I have now changed the sequential scan so that it accesses
1811
* the records from the cache, without making a copy.
1813
* This means that this problem cannot occur because the sequential scan
1814
* always reads the current data from the cache.
1816
* There is also no race condition (although no lock is taken), because
1817
* the record is writen before the row (see here [(5)]).
1819
* This means that the row does not point to the record before the
1820
* record has been modified.
1822
* Once the record has been modified then the sequential scan will see
1823
* that the record belongs to a new transaction.
1825
* If the row pointer was set before the record updated then a race
1826
* condition would exist when the sequential scan reads the record
1827
* after the insert has updated the row pointer but before it has
1828
* changed the record.
1832
* I believe I can remove the delayed free record!
1834
* This means I can combine the REMOVE and FREE operations.
1836
* This is good because this takes care of the problem
1837
* that records are lost when:
1839
* The server crashes when the delayed free list still has items on it.
1841
* The transaction that freed the records has been cleaned, and this
1842
* fact has been committed to the log.
1844
* So I have removed the delay here: [(6)]
1846
* ------------ 2008-12-03
1848
* This code to delay removal of records was finally removed (see above)
1852
* As above, but instead a transaction is added to the "to free" queue.
1854
* It is important that transactions remain in memory until all
1855
* currently running transactions have ended. This is because
1856
* sequential and index scans have copies of old data.
1858
* In the old data a record may not be indicated as cleaned. Such
1859
* a record is considered invalid if the transaction is not in RAM.
1863
* And this problem is demonstrated by the following example
1864
* which was derived from flush_table.test.
1866
* Each handler command below is a separate transaction.
1867
* However the buffer is loaded by 'read first'.
1868
* Depending on when cleanup occurs, records can disappear
1869
* in some of the next commands.
1871
* 2 solutions for the test. Use begin ... commit around
1872
* handler open ... close. Or use analyze table t1 before
1873
* open. analyze table waits for the sweeper to complete!
1875
* create table dummy(table_id char(20) primary key);
1879
* drop table if exists t1;
1880
* create table t1(table_id char(20) primary key);
1881
* insert into t1 values ('Record-01');
1882
* insert into t1 values ('Record-02');
1883
* insert into t1 values ('Record-03');
1884
* insert into t1 values ('Record-04');
1885
* insert into t1 values ('Record-05');
1887
* handler t1 read first limit 1;
1888
* handler t1 read next limit 1;
1889
* handler t1 read next limit 1;
1890
* handler t1 read next limit 1;
1897
#ifdef MUST_DELAY_REMOVE
1898
static void xn_sw_add_xact_to_free(XTThreadPtr self, XNSweeperStatePtr ss, xtXactID xn_id)
1900
XNSWToFreeItemRec free_item;
1902
if ((ss->ss_to_free.bq_front - ss->ss_to_free.bq_back) >= XT_TN_MAX_TO_FREE) {
1903
/* If the queue is full, try to free some items:
1904
* We use the call count to avoid doing this every time,
1905
* when the queue overflows!
1907
if ((ss->ss_call_cnt % XT_TN_MAX_TO_FREE_CHECK) == 0)
1908
/* GOTCHA: This call was not locking the sweeper,
1909
* this could cause failure, of course:
1911
xn_sw_service_to_free(self, ss, TRUE);
1915
free_item.ri_wait_xn_id = ss->ss_db->db_xn_curr_id;
1916
free_item.ri_tab_id = 0;
1917
free_item.x.ri_xn_id = xn_id;
1919
xt_bq_add(self, &ss->ss_to_free, &free_item);
1923
static void xt_sw_delete_variations(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, xtRecordID rec_id, xtRowID row_id, xtXactID xn_id)
1925
xtRecordID prev_var_rec_id;
1928
switch (xt_tab_remove_record(ot, rec_id, ss->ss_databuf.db_data, &prev_var_rec_id, FALSE, row_id, xn_id)) {
1935
rec_id = prev_var_rec_id;
1939
static void xt_sw_delete_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, xtRecordID rec_id, xtBool clean_delete, xtRowID row_id, xtXactID xn_id)
1941
xtRecordID prev_var_rec_id;
1943
switch (xt_tab_remove_record(ot, rec_id, ss->ss_databuf.db_data, &prev_var_rec_id, clean_delete, row_id, xn_id)) {
1954
/* Set rec_type to this value in order to force cleanup, without
1957
#define XN_FORCE_CLEANUP XT_TAB_STATUS_FREED
1960
* Read the record to be cleaned. Return TRUE if the cleanup has already been done.
1962
static xtBool xn_sw_cleanup_done(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtXactID xn_id, u_int rec_type, u_int stat_id, xtRowID row_id, XTTabRecHeadDPtr rec_head)
1964
if (!xt_tab_get_rec_data(ot, rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) rec_head))
1967
if (rec_type == XN_FORCE_CLEANUP) {
1968
if (XT_REC_IS_FREE(rec_head->tr_rec_type_1))
1972
/* Transaction must match: */
1973
if (XT_GET_DISK_4(rec_head->tr_xact_id_4) != xn_id)
1976
/* Record header must match expected value from
1977
* log or clean has been done, or is not required.
1979
* For example, it is not required if a record
1980
* has been overwritten in a transaction.
1982
if (rec_head->tr_rec_type_1 != rec_type ||
1983
rec_head->tr_stat_id_1 != stat_id)
1986
/* Row must match: */
1987
if (XT_GET_DISK_4(rec_head->tr_row_id_4) != row_id)
1994
static void xn_sw_clean_indices(XTThreadPtr XT_NDEBUG_UNUSED(self), XTOpenTablePtr ot, xtRecordID rec_id, xtRowID row_id, xtWord1 *rec_data, xtWord1 *rec_buffer)
1996
XTTableHPtr tab = ot->ot_table;
2000
if (!tab->tab_dic.dic_key_count)
2003
cols_req = tab->tab_dic.dic_ind_cols_req;
2004
if (XT_REC_IS_FIXED(rec_data[0]))
2005
rec_buffer = rec_data + XT_REC_FIX_HEADER_SIZE;
2007
if (XT_REC_IS_VARIABLE(rec_data[0])) {
2008
if (!myxt_load_row(ot, rec_data + XT_REC_FIX_HEADER_SIZE, rec_buffer, cols_req))
2011
else if (XT_REC_IS_EXT_DLOG(rec_data[0])) {
2013
if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
2014
if (!myxt_load_row(ot, rec_data + XT_REC_EXT_HEADER_SIZE, rec_buffer, cols_req))
2018
if (rec_data != ot->ot_row_rbuffer)
2019
memcpy(ot->ot_row_rbuffer, rec_data, tab->tab_dic.dic_rec_size);
2020
if (!xt_tab_load_ext_data(ot, rec_id, rec_buffer, cols_req))
2025
/* This is possible, the record has already been cleaned up. */
2029
ind = tab->tab_dic.dic_keys;
2030
for (u_int i=0; i<tab->tab_dic.dic_key_count; i++, ind++) {
2031
if (!xt_idx_update_row_id(ot, *ind, rec_id, row_id, rec_buffer))
2032
xt_log_and_clear_exception_ns();
2037
xt_log_and_clear_exception_ns();
2041
* Return TRUE if the cleanup was done. FAILED if cleanup could not be done
2042
* because dictionary information is not available.
2044
static xtBool xn_sw_cleanup_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTXactDataPtr xact, xtTableID tab_id, xtRecordID rec_id, u_int status, u_int rec_type, u_int stat_id, xtRowID row_id, xtWord1 *rec_buf)
2048
XTTabRecHeadDRec rec_head;
2049
xtRecordID after_rec_id;
2051
xtBool skip_cleanup;
2053
if (!(ot = xn_sw_get_open_table(self, ss, tab_id, &skip_cleanup)))
2054
/* The table no longer exists, consider cleanup done: */
2055
return skip_cleanup;
2058
ASSERT_NS(ot->ot_thread == self);
2060
/* Make sure the buffer is large enough! */
2061
xt_db_set_size(self, &ss->ss_databuf, (size_t) tab->tab_dic.dic_mysql_buf_size);
2063
xn_id = xact->xd_start_xn_id;
2064
if (xact->xd_flags & XT_XN_XAC_COMMITTED) {
2065
/* The transaction has been committed. Clean the record and
2066
* remove variations no longer in use.
2069
case XT_LOG_ENT_REC_MODIFIED:
2070
case XT_LOG_ENT_UPDATE:
2071
case XT_LOG_ENT_UPDATE_FL:
2072
case XT_LOG_ENT_UPDATE_BG:
2073
case XT_LOG_ENT_UPDATE_FL_BG:
2074
if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
2076
after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
2077
xt_sw_delete_variations(self, ss, ot, after_rec_id, row_id, xn_id);
2078
rec_head.tr_rec_type_1 |= XT_TAB_STATUS_CLEANED_BIT;
2079
XT_SET_NULL_DISK_4(rec_head.tr_prev_rec_id_4);
2080
if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_CLEANED, 0, rec_id, offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE, (xtWord1 *) &rec_head))
2082
xn_sw_clean_indices(self, ot, rec_id, row_id, rec_buf, ss->ss_databuf.db_data);
2084
case XT_LOG_ENT_INSERT:
2085
case XT_LOG_ENT_INSERT_FL:
2086
case XT_LOG_ENT_INSERT_BG:
2087
case XT_LOG_ENT_INSERT_FL_BG: {
2090
* DROP TABLE IF EXISTS t1;
2091
* CREATE TABLE t1 ( id int, name varchar(300)) engine=pbxt;
2094
* insert t1(id, name) values(1, "aaa");
2095
* update t1 set name=REPEAT('A', 300) where id = 1;
2100
* Because the type of record changes, from VARIABLE to
2101
* EXTENDED, the cleanup needs to take this into account.
2103
* The input new status value which is written here
2104
* depends on the first write to the record.
2105
* However, the second write changes the record status.
2107
* Previously we used a OR function to write the bit and
2108
* return the byte value of the result.
2110
* The write funtion now checks the record to be written
2111
* to make sure it matches the record that needs to be
2112
* cleaned. So OR'ing the bit is no longer required.
2116
* We have changed this to fix the following bug:
2120
* T2 insert record 100 in row 50
2122
* T1 updates row 50 and adds record 101
2124
* The sweeper does cleanup in order T1, T2, ...
2126
* The sweeper cleans T1 by removing record 100 from the
2127
* row 50 variation list.
2128
* This means that record 100 is free.
2130
* The sweeper cleans T2 by marking record 100 as clean.
2131
* !BUG! record 100 has already been freed!
2133
* To avoid this we have to check a record before
2134
* cleaning (as we do above for update in xn_sw_cleanup_done())
2135
* We check that the record is, in fact, the exact
2136
* record that was inserted.
2138
* This is now done be xt_tc_write_cond().
2142
rec_head.tr_rec_type_1 = rec_type | XT_TAB_STATUS_CLEANED_BIT;
2143
if(!tab->tab_recs.xt_tc_write_cond(self, ot->ot_rec_file, rec_id, rec_head.tr_rec_type_1, &op_seq, xn_id, row_id, stat_id, rec_type))
2144
/* this means record was not updated by xt_tc_write_bor and doesn't need to */
2146
if (!xt_xlog_modify_table(tab->tab_id, XT_LOG_ENT_REC_CLEANED_1, op_seq, 0, 0, rec_id, 1, &rec_head.tr_rec_type_1, self))
2148
xn_sw_clean_indices(self, ot, rec_id, row_id, rec_buf, ss->ss_databuf.db_data);
2151
case XT_LOG_ENT_DELETE:
2152
case XT_LOG_ENT_DELETE_FL:
2153
case XT_LOG_ENT_DELETE_BG:
2154
case XT_LOG_ENT_DELETE_FL_BG:
2155
if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
2157
after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
2158
xt_sw_delete_variations(self, ss, ot, after_rec_id, row_id, xn_id);
2159
xt_sw_delete_variation(self, ss, ot, rec_id, TRUE, row_id, xn_id);
2161
if (!xt_tab_free_row(ot, tab, row_id))
2168
/* The transaction has been aborted. Remove the variation from the
2169
* variation list. If this means the list is empty, then remove
2170
* the record as well.
2172
xtRecordID first_rec_id, next_rec_id, prev_rec_id;
2173
XTTabRecHeadDRec prev_rec_head;
2175
if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
2179
row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
2180
after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
2184
/* Now remove the record from the variation list,
2185
* (if it is still on the list).
2187
XT_TAB_ROW_WRITE_LOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
2189
/* Find the variation before the variation we wish to remove: */
2190
if (!(xt_tab_get_row(ot, row_id, &first_rec_id)))
2193
next_rec_id = first_rec_id;
2194
while (next_rec_id != rec_id) {
2196
/* The record was not found in the list (we are done) */
2197
XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
2200
if (!xt_tab_get_rec_data(ot, next_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &prev_rec_head)) {
2201
xt_log_and_clear_exception(self);
2205
prev_rec_id = next_rec_id;
2206
next_rec_id = XT_GET_DISK_4(prev_rec_head.tr_prev_rec_id_4);
2209
if (next_rec_id == rec_id) {
2210
/* The record was found on the list: */
2212
/* Unlink the deleted variation:
2213
* I have found the following sequence:
2215
* 17933 in use 1906112
2216
* 1906112 delete xact=2901 row=17933 prev=2419240
2217
* 2419240 delete xact=2899 row=17933 prev=2153360
2218
* 2153360 record-X C xact=2599 row=17933 prev=0 Xlog=151 Xoff=16824 Xsiz=100
2220
* Despite the following facts which should prevent chains from
2223
* --- Only one transaction can modify a row
2224
* at any one time. So it is not possible for a new change
2225
* to be linked onto an uncommitted change.
2227
* --- Transactions that modify the same row
2228
* twice do not allocate a new record for each change.
2230
* -- A change that has been
2231
* rolled back will not be linked onto. Instead
2232
* the new transaction will link to the last.
2235
* So if the sweeper is slow in doing its job
2236
* we can have the situation that a number of records
2237
* can refer to the last committed record of the
2240
* Only one will be reference by the row pointer.
2242
* The other, will all have been rolled back.
2243
* This occurs over here: [(4)]
2245
XT_SET_DISK_4(prev_rec_head.tr_prev_rec_id_4, after_rec_id);
2246
if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_UNLINKED, 0, prev_rec_id, offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE, (xtWord1 *) &prev_rec_head))
2250
/* Variation to be removed at the front of the list. */
2251
ASSERT(rec_id == first_rec_id);
2253
/* Unlink the deleted variation, from the front of the list: */
2254
if (!xt_tab_set_row(ot, XT_LOG_ENT_ROW_SET, row_id, after_rec_id))
2258
/* No more variations, remove the row: */
2259
if (!xt_tab_free_row(ot, tab, row_id))
2265
XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
2267
/* Note: even when not found on the row list, the record must still
2270
* There might be an exception to this, but there are very definite
2271
* cases where this is required, for example when an unreferenced
2272
* record is found and added to the clean up list xn_add_cu_record().
2276
/* Delete the extended record and index entries:
2278
* NOTE! This must be done after we have release the row lock. Because
2279
* a thread that does a duplicate check locks the index, and then
2280
* check whether a row is valid, and can deadlock with
2281
* code that locks a row, then an index!
2283
* However, this should all be OK, because the variation has been removed from the
2284
* row variation list at this stage, and now just need to be deleted.
2286
xt_sw_delete_variation(self, ss, ot, rec_id, FALSE, row_id, xn_id);
2293
XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
2298
/* Go through all updated records of a transaction and cleanup.
2299
* This means, of the transaction was aborted, then all the variations written
2300
* by the transaction must be removed.
2301
* If the transaction was committed then we remove older variations.
2302
* If a delete was committed this can lead to the row being removed.
2304
* After a transaction has been cleaned it can be removed from RAM.
2305
* If this was the last transaction in a log, and the log has reached
2306
* threshold, and the log is no longer in exclusive use, then the log
2309
* This function returns OK if the transaction was cleaned up, FALSE
2310
* if a retry is required. Othersize an error is thrown.
2312
static xtBool xn_sw_cleanup_xact(XTThreadPtr self, XNSweeperStatePtr ss, XTXactDataPtr xact)
2314
XTDatabaseHPtr db = ss->ss_db;
2315
XTXactLogBufferDPtr record;
2321
if (!db->db_xlog.xlog_seq_start(&ss->ss_seqread, xact->xd_begin_log, xact->xd_begin_offset, FALSE))
2328
xn_sw_could_go_faster(self, db);
2330
if (!db->db_xlog.xlog_seq_next(&ss->ss_seqread, &record, FALSE, self))
2333
/* Recovered transactions are considered cleaned when we
2334
* reach the end of the transaction log.
2335
* This is required, because transactions that do
2336
* not have a commit (or rollback) record, because they were
2337
* running when the server last went down, will otherwise not
2338
* have the cleanup completed!!
2340
ASSERT(xact->xd_flags & XT_XN_XAC_RECOVERED);
2341
if (!(xact->xd_flags & XT_XN_XAC_RECOVERED))
2345
switch (record->xh.xh_status_1) {
2346
case XT_LOG_ENT_NEW_LOG:
2347
if (!db->db_xlog.xlog_seq_start(&ss->ss_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, FALSE))
2350
case XT_LOG_ENT_COMMIT:
2351
case XT_LOG_ENT_ABORT:
2352
xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
2353
if (xn_id == xact->xd_start_xn_id)
2356
case XT_LOG_ENT_REC_MODIFIED:
2357
case XT_LOG_ENT_UPDATE:
2358
case XT_LOG_ENT_INSERT:
2359
case XT_LOG_ENT_DELETE:
2360
case XT_LOG_ENT_UPDATE_BG:
2361
case XT_LOG_ENT_INSERT_BG:
2362
case XT_LOG_ENT_DELETE_BG:
2363
xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
2364
if (xn_id != xact->xd_start_xn_id)
2366
tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
2367
rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
2368
row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
2369
if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_id, record->xu.xu_status_1, record->xu.xu_rec_type_1, record->xu.xu_stat_id_1, row_id, &record->xu.xu_rec_type_1))
2372
case XT_LOG_ENT_UPDATE_FL:
2373
case XT_LOG_ENT_INSERT_FL:
2374
case XT_LOG_ENT_DELETE_FL:
2375
case XT_LOG_ENT_UPDATE_FL_BG:
2376
case XT_LOG_ENT_INSERT_FL_BG:
2377
case XT_LOG_ENT_DELETE_FL_BG:
2378
xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
2379
if (xn_id != xact->xd_start_xn_id)
2381
tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
2382
rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
2383
row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
2384
if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_id, record->xf.xf_status_1, record->xf.xf_rec_type_1, record->xf.xf_stat_id_1, row_id, &record->xf.xf_rec_type_1))
2393
/* Write the log to indicate the transaction has been cleaned: */
2394
XTXactCleanupEntryDRec cu;
2396
cu.xc_status_1 = XT_LOG_ENT_CLEANUP;
2397
cu.xc_checksum_1 = XT_CHECKSUM_1(XT_CHECKSUM4_XACT(xact->xd_start_xn_id));
2398
XT_SET_DISK_4(cu.xc_xact_id_4, xact->xd_start_xn_id);
2400
if (!xt_xlog_log_data(self, sizeof(XTXactCleanupEntryDRec), (XTXactLogBufferDPtr) &cu, XT_XLOG_NO_WRITE_NO_FLUSH))
2403
ss->ss_flush_pending = TRUE;
2405
xact->xd_flags |= XT_XN_XAC_CLEANED;
2406
#ifndef XT_SWEEPER_SORT_XACTS
2407
ASSERT(db->db_xn_to_clean_id == xact->xd_start_xn_id);
2409
#ifdef MUST_DELAY_REMOVE
2410
xn_sw_add_xact_to_free(self, ss, xact->xd_start_xn_id);
2412
xn_id = xact->xd_start_xn_id;
2413
if (xt_xn_delete_xact(db, xn_id, self)) {
2414
/* Recalculate the minimum memory transaction: */
2415
ASSERT(!xt_xn_is_before(xn_id, db->db_xn_min_ram_id));
2417
if (db->db_xn_min_ram_id == xn_id) {
2418
db->db_xn_min_ram_id = xn_id+1;
2421
xtXactID xn_curr_xn_id = xt_xn_get_curr_id(db);
2423
while (!xt_xn_is_before(xn_curr_xn_id, db->db_xn_min_ram_id)) { // was db->db_xn_min_ram_id <= xn_curr_xn_id
2424
/* db_xn_min_ram_id may be changed, by some other process! */
2425
xn_id = db->db_xn_min_ram_id;
2426
if (xn_get_xact_details(db, xn_id, self, NULL, NULL, NULL, NULL))
2428
db->db_xn_min_ram_id = xn_id+1;
2437
static void xn_free_sw_state(XTThreadPtr self, XNSweeperStatePtr ss)
2439
xn_sw_close_open_table(self, ss);
2441
ss->ss_db->db_xlog.xlog_seq_exit(&ss->ss_seqread);
2442
xt_db_set_size(self, &ss->ss_databuf, 0);
2443
xt_bq_set_size(self, &ss->ss_to_free, 0);
2446
#ifdef XT_SWEEPER_SORT_XACTS
2447
static int xn_compare_xact(const void *a, const void *b)
2449
register XTXactDataPtr b_a = *((XTXactDataPtr *) a);
2450
register XTXactDataPtr b_b = *((XTXactDataPtr *) b);
2452
if (b_a->xd_end_xn_id == b_b->xd_end_xn_id) {
2453
if (b_a->xd_start_xn_id < b_b->xd_start_xn_id)
2457
if (b_a->xd_end_xn_id < b_b->xd_end_xn_id)
2463
static void xn_sw_main(XTThreadPtr self)
2465
XTDatabaseHPtr db = self->st_database;
2466
XNSweeperStatePtr ss;
2468
time_t idle_start = 0;
2470
#ifdef XT_SWEEPER_SORT_XACTS
2472
xtXactID next_clean_id;
2474
XTXactDataPtr xact2;
2477
xt_set_priority(self, xt_db_sweeper_priority);
2479
alloczr_(ss, xn_free_sw_state, sizeof(XNSweeperStateRec), XNSweeperStatePtr);
2482
if (!db->db_xlog.xlog_seq_init(&ss->ss_seqread, xt_db_log_buffer_size, FALSE))
2485
ss->ss_to_free.bq_item_size = sizeof(XNSWToFreeItemRec);
2486
ss->ss_to_free.bq_max_waste = XT_TN_MAX_TO_FREE_WASTE;
2487
ss->ss_to_free.bq_item_inc = XT_TN_MAX_TO_FREE_INC;
2488
ss->ss_call_cnt = 0;
2489
ss->ss_flush_pending = FALSE;
2491
while (!self->t_quit) {
2492
while (!self->t_quit) {
2493
curr_id = xt_xn_get_curr_id(db);
2495
#ifdef XT_SWEEPER_SORT_XACTS
2496
/* Add transactions to the list if required: */
2497
while (db->db_sw_list_size < XT_SW_XACT_SORT_LIST_SIZE &&
2498
!xt_xn_is_before(curr_id, db->db_sw_to_add)) {
2499
if ((xact = xt_xn_get_xact(db, db->db_sw_to_add, self))) {
2500
/* Only add transactions that have completed: */
2501
if (!(xact->xd_flags & XT_XN_XAC_SWEEP))
2504
/* Add only transactions that did an update to the list: */
2505
if ((xact->xd_flags & XT_XN_XAC_LOGGED)) {
2506
db->db_sw_xact_list[db->db_sw_list_size] = xact;
2507
db->db_sw_list_size++;
2510
/* Should not be required (done by the transction itself)! */
2511
if (xt_xn_delete_xact(db, db->db_sw_to_add, self)) {
2512
if (db->db_xn_min_ram_id == db->db_sw_to_add)
2513
db->db_xn_min_ram_id = db->db_sw_to_add+1;
2518
/* If there are no transactions to be cleaned, then the
2519
* next to clean will be at least the next one to check.
2521
if (!db->db_sw_list_size)
2522
db->db_xn_to_clean_id = db->db_sw_to_add;
2525
if (!db->db_sw_list_size) {
2526
/* Nothing to do: */
2527
db->db_sw_faster &= ~XT_SW_TOO_FAR_BEHIND;
2531
if (!db->db_sw_list_size == XT_SW_XACT_SORT_LIST_SIZE)
2532
db->db_sw_faster |= XT_SW_TOO_FAR_BEHIND;
2533
xn_sw_could_go_faster(self, db);
2536
/* Sort the transactions, according to there end order: */
2537
qsort(db->db_sw_xact_list, db->db_sw_list_size, sizeof(XTXactDataPtr), xn_compare_xact);
2539
for (i=0; i<db->db_sw_list_size; i++) {
2540
xact = db->db_sw_xact_list[i];
2541
if (!xt_xn_is_before(xact->xd_end_xn_id, db->db_sw_to_add)) // xact->xd_end_xn_id >= db->db_sw_to_add
2544
if (!xn_sw_cleanup_xact(self, ss, xact)) {
2545
/* We failed to clean (try again later)... */
2546
#ifdef TRACE_SWEEPER_ACTIVITY
2547
printf("SWEEPER: cleanup retry...\n", (int) xact->xd_start_xn_id);
2553
if (i == db->db_sw_list_size) {
2554
/* All cleaned out: */
2555
db->db_xn_to_clean_id = db->db_sw_to_add;
2556
db->db_sw_list_size = 0;
2561
/* The next to clean will be the smallest still in the
2564
* NOTE: db_xn_to_clean_id means that all transactions
2565
* before this are clean.
2567
* It may be that some after this point have also
2570
next_clean_id = db->db_sw_xact_list[i]->xd_start_xn_id;
2571
for (j=i+1; j<db->db_sw_list_size; j++) {
2572
if (xt_xn_is_before(db->db_sw_xact_list[j]->xd_start_xn_id, next_clean_id))
2573
next_clean_id = db->db_sw_xact_list[j]->xd_start_xn_id;
2575
db->db_xn_to_clean_id = next_clean_id;
2578
/* Something to do, but nothing done! */
2579
if ((xact = xt_xn_get_xact(db, db->db_sw_to_add, self))) {
2580
/* Before we go to sleep, lets just check again: */
2581
if (!(xact->xd_flags & XT_XN_XAC_SWEEP)) {
2582
db->db_stat_sweep_waits++;
2588
memmove(db->db_sw_xact_list, &db->db_sw_xact_list[i], (db->db_sw_list_size - i) * sizeof(XTXactDataPtr));
2589
db->db_sw_list_size -= i;
2592
/* We are just about to check the condition for sleeping,
2593
* so if the condition for sleeping holds, then we will
2594
* exit the loop and sleep.
2596
* We will then sleep if nobody sets the flag before we
2597
* actually do sleep!
2599
if (xt_xn_is_before(curr_id, db->db_xn_to_clean_id)) {
2600
db->db_sw_faster &= ~XT_SW_TOO_FAR_BEHIND;
2604
/* {TUNING} How far to we allow the sweeper to get behind?
2605
* The higher this is, the higher burst performance can
2606
* be. But too high and the sweeper falls out of reading the
2607
* transaction log cache, and also starts to spread
2608
* changes around in index and data blocks that are no
2611
if (curr_id - db->db_xn_to_clean_id > 250)
2612
db->db_sw_faster |= XT_SW_TOO_FAR_BEHIND;
2614
db->db_sw_faster &= ~XT_SW_TOO_FAR_BEHIND;
2615
xn_sw_could_go_faster(self, db);
2618
if ((xact = xt_xn_get_xact(db, db->db_xn_to_clean_id, self))) {
2621
/* The sweep flag is set when the transaction is ready for sweeping.
2622
* Prepared transactions may not be swept!
2624
if (!(xact->xd_flags & XT_XN_XAC_SWEEP) || (xact->xd_flags & XT_XN_XAC_PREPARED))
2627
/* Check if we can cleanup the transaction.
2628
* We do this by checking to see if there is any running
2629
* transaction which start before the end of this transaction.
2631
xn_id = xact->xd_start_xn_id;
2632
while (xt_xn_is_before(xn_id, xact->xd_end_xn_id)) {
2634
if ((xact2 = xt_xn_get_xact(db, xn_id, self))) {
2635
if (!(xact2->xd_flags & XT_XN_XAC_ENDED)) {
2636
/* A transaction was started before the end of
2637
* the transaction we wish to sweep, and this
2638
* transaction has not committed, the we have to
2641
db->db_stat_sweep_waits++;
2647
/* Can cleanup the transaction, and move to the next. */
2648
if (xact->xd_flags & XT_XN_XAC_LOGGED) {
2649
#ifdef TRACE_SWEEPER_ACTIVITY
2650
printf("SWEEPER: cleanup %d\n", (int) xact->xd_start_xn_id);
2652
if (!xn_sw_cleanup_xact(self, ss, xact)) {
2653
/* We failed to clean (try again later)... */
2654
#ifdef TRACE_SWEEPER_ACTIVITY
2655
printf("SWEEPER: cleanup retry...\n", (int) xact->xd_start_xn_id);
2659
#ifdef TRACE_SWEEPER_ACTIVITY
2660
printf("SWEEPER: cleanup DONE\n", (int) xact->xd_start_xn_id);
2664
/* This was a read-only transaction, it is safe to
2665
* just remove the transaction structure from memory.
2666
* (should not be necessary because RO transactions
2667
* do this themselves):
2669
if (xt_xn_delete_xact(db, db->db_xn_to_clean_id, self)) {
2670
if (db->db_xn_min_ram_id == db->db_xn_to_clean_id)
2671
db->db_xn_min_ram_id = db->db_xn_to_clean_id+1;
2676
/* Move on to clean the next: */
2677
db->db_xn_to_clean_id++;
2683
xn_sw_close_open_table(self, ss);
2685
xn_sw_go_slower(self, db);
2687
/* Shrink the free list, if it is empty, and larger then
2690
if (ss->ss_to_free.bq_size > XT_TN_MAX_TO_FREE) {
2691
if (ss->ss_to_free.bq_front == 0 && ss->ss_to_free.bq_back == 0)
2692
xt_bq_set_size(self, &ss->ss_to_free, XT_TN_MAX_TO_FREE);
2695
/* Windows: close the log file that we have open for reading, if we
2696
* read past the end of the log on the last transaction.
2697
* This makes sure that the log is closed when the checkpointer
2698
* tries to remove or rename it!!
2700
if (ss->ss_seqread.xseq_log_file) {
2701
if (ss->ss_seqread.xseq_rec_log_id != ss->ss_seqread.xseq_log_id)
2702
db->db_xlog.xlog_seq_close(&ss->ss_seqread);
2705
if (ss->ss_flush_pending) {
2706
/* Flush pending means we have written something to the log.
2708
* if so we flush the log so that the writer will also do
2711
* This will lead to the freeer continuing if it is waiting.
2714
time_t now = time(NULL);
2716
/* By default, we wait for 2 seconds idle time, then
2719
if (now >= idle_start + 2) {
2720
/* Don't do this if flusher is active! */
2721
if (!db->db_fl_thread &&
2722
!xt_xlog_flush_log(db, self))
2724
ss->ss_flush_pending = FALSE;
2731
/* {WAKE-SW} Waking up the sweeper is very expensive!
2732
* Cost is 3% of execution time on the test:
2733
* runTest(SMALL_SELECT_TEST, 2, 100000)
2735
* On the other hand, polling every 1/10 second
2736
* is cheap, because the check for transactions
2737
* ready for cleanup is very quick.
2739
* So this is the prefered method.
2741
xn_sw_wait_for_xact(self, db, 10);
2744
if (ss->ss_flush_pending) {
2745
xt_xlog_flush_log(db, self);
2746
ss->ss_flush_pending = FALSE;
2749
freer_(); // xn_free_sw_state(ss)
2752
static void *xn_sw_run_thread(XTThreadPtr self)
2754
XTDatabaseHPtr db = (XTDatabaseHPtr) self->t_data;
2758
if (!(mysql_thread = myxt_create_thread()))
2761
while (!self->t_quit) {
2764
* The garbage collector requires that the database
2765
* is in use because.
2767
xt_use_database(self, db, XT_FOR_SWEEPER);
2769
/* {BACKGROUND-RELEASE-DB}
2770
* This action is both safe and required:
2772
* safe: releasing the database is safe because as
2773
* long as this thread is running the database
2774
* reference is valid, and this reference cannot
2775
* be the only one to the database because
2776
* otherwize this thread would not be running.
2778
* required: releasing the database is necessary
2779
* otherwise we cannot close the database
2780
* correctly because we only shutdown this
2781
* thread when the database is closed and we
2782
* only close the database when all references
2785
xt_heap_release(self, self->st_database);
2790
/* This error is "normal"! */
2791
if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
2792
!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
2793
self->t_exception.e_sys_err == SIGTERM))
2794
xt_log_and_clear_exception(self);
2798
/* Avoid releasing the database (done above) */
2799
self->st_database = NULL;
2800
xt_unuse_database(self, self);
2802
/* After an exception, pause before trying again... */
2803
/* Number of seconds */
2809
db->db_sw_idle = XT_THREAD_INERR;
2810
while (!self->t_quit && count > 0) {
2814
db->db_sw_idle = XT_THREAD_BUSY;
2818
* {MYSQL-THREAD-KILL}
2819
myxt_destroy_thread(mysql_thread, TRUE);
2824
static void xn_sw_free_thread(XTThreadPtr self, void *data)
2826
XTDatabaseHPtr db = (XTDatabaseHPtr) data;
2828
if (db->db_sw_thread) {
2829
xt_lock_mutex(self, &db->db_sw_lock);
2830
pushr_(xt_unlock_mutex, &db->db_sw_lock);
2831
db->db_sw_thread = NULL;
2832
freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2836
/* Wait for a transaction to quit: */
2837
static void xn_sw_wait_for_xact(XTThreadPtr self, XTDatabaseHPtr db, u_int hsecs)
2839
xt_lock_mutex(self, &db->db_sw_lock);
2840
pushr_(xt_unlock_mutex, &db->db_sw_lock);
2841
db->db_sw_idle = XT_THREAD_IDLE;
2842
if (!self->t_quit && !db->db_sw_faster)
2843
xt_timed_wait_cond(self, &db->db_sw_cond, &db->db_sw_lock, hsecs * 10);
2844
db->db_sw_idle = XT_THREAD_BUSY;
2845
db->db_sw_check_count++;
2846
freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2849
xtPublic void xt_start_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
2851
char name[PATH_MAX];
2853
sprintf(name, "SW-%s", xt_last_directory_of_path(db->db_main_path));
2854
xt_remove_dir_char(name);
2855
db->db_sw_thread = xt_create_daemon(self, name);
2856
xt_set_thread_data(db->db_sw_thread, db, xn_sw_free_thread);
2857
xt_run_thread(self, db->db_sw_thread, xn_sw_run_thread);
2860
xtPublic void xt_init_sweeper_wait(XTThreadPtr self, XTDatabaseHPtr db)
2862
xtXactID current_id;
2864
current_id = db->db_xn_curr_id;
2865
if (!xt_xn_is_before(current_id, db->db_xn_to_clean_id)) {
2866
double init_diff, curr_done = 0;
2868
xtBool print_progress = FALSE;
2869
int perc_to_print = 1;
2871
init_diff = (double) xt_xn_diff(current_id, db->db_xn_to_clean_id);
2872
start_time = time(NULL);
2874
xt_logf(XT_NT_INFO, "PBXT: Initial sweep, transactions to scan: %llu\n", (u_llong) init_diff);
2876
while (!xt_xn_is_before(current_id, db->db_xn_to_clean_id)) { // means: db->db_xn_to_clean_id <= current_id
2877
xt_lock_mutex(self, &db->db_sw_lock);
2878
pushr_(xt_unlock_mutex, &db->db_sw_lock);
2879
xt_wakeup_sweeper(db);
2880
freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2882
xt_sleep_milli_second(10);
2884
if (!print_progress) {
2885
if (time(NULL) - start_time > 2)
2886
print_progress = TRUE;
2889
if (print_progress) {
2890
curr_done = init_diff - (double) xt_xn_diff(current_id, db->db_xn_to_clean_id);
2891
while (perc_to_print <= (int) (curr_done / init_diff * 100)) {
2892
if (((perc_to_print - 1) % 25) == 0)
2893
xt_logf(XT_NT_INFO, "PBXT: ");
2894
if ((perc_to_print % 25) == 0)
2895
xt_logf(XT_NT_INFO, "%2d\n", perc_to_print);
2897
xt_logf(XT_NT_INFO, "%2d ", perc_to_print);
2904
if (print_progress) {
2905
while (perc_to_print <= 100) {
2906
if (((perc_to_print - 1) % 25) == 0)
2907
xt_logf(XT_NT_INFO, "PBXT: ");
2908
if ((perc_to_print % 25) == 0)
2909
xt_logf(XT_NT_INFO, "%2d\n", perc_to_print);
2911
xt_logf(XT_NT_INFO, "%2d ", perc_to_print);
2916
xt_logf(XT_NT_INFO, "PBXT: Initial sweep complete, transactions scanned: %llu\n", (u_llong) init_diff);
2920
xtPublic void xt_wait_for_sweeper(XTThreadPtr self, XTDatabaseHPtr db, int abort_time)
2923
xtBool message = FALSE;
2925
if (db->db_sw_thread) {
2927
/* Changed xt_xn_get_curr_id(db) to db->db_xn_curr_id,
2928
* This should work because we are not concerned about the difference
2929
* between xt_xn_get_curr_id(db) and db->db_xn_curr_id,
2930
* Which is just a matter of when transactions we can expect ot find
2931
* in memory (see {GAP-INC-ADD-XACT})
2933
while (!xt_xn_is_before(db->db_xn_curr_id, db->db_xn_to_clean_id)) { // was db->db_xn_to_clean_id <= xt_xn_get_curr_id(db)
2934
xt_lock_mutex(self, &db->db_sw_lock);
2935
pushr_(xt_unlock_mutex, &db->db_sw_lock);
2936
xt_wakeup_sweeper(db);
2937
freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2938
xt_sleep_milli_second(10);
2940
if (abort_time && now >= then + abort_time) {
2941
xt_logf(XT_NT_INFO, "Aborting wait for '%s' sweeper\n", db->db_name);
2945
if (now >= then + 2) {
2948
xt_logf(XT_NT_INFO, "Waiting for '%s' sweeper...\n", db->db_name);
2954
xt_logf(XT_NT_INFO, "Sweeper '%s' done.\n", db->db_name);
2958
xtPublic void xt_stop_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
2962
if (db->db_sw_thread) {
2963
xt_lock_mutex(self, &db->db_sw_lock);
2964
pushr_(xt_unlock_mutex, &db->db_sw_lock);
2966
/* This pointer is safe as long as you have the transaction lock. */
2967
if ((thr_sw = db->db_sw_thread)) {
2968
xtThreadID tid = thr_sw->t_id;
2970
/* Make sure the thread quits when woken up. */
2971
xt_terminate_thread(self, thr_sw);
2973
xt_wakeup_sweeper(db);
2975
freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2978
* GOTCHA: This is a wierd thing but the SIGTERM directed
2979
* at a particular thread (in this case the sweeper) was
2980
* being caught by a different thread and killing the server
2981
* sometimes. Disconcerting.
2982
* (this may only be a problem on Mac OS X)
2983
xt_kill_thread(thread);
2985
xt_wait_for_thread_to_exit(tid, FALSE);
2987
/* PMC - This should not be necessary to set the signal here, but in the
2988
* debugger the handler is not called!!?
2989
thr_sw->t_delayed_signal = SIGTERM;
2990
xt_kill_thread(thread);
2992
db->db_sw_thread = NULL;
2995
freer_(); // xt_unlock_mutex(&db->db_sw_lock)
2999
xtPublic void xt_wakeup_sweeper(XTDatabaseHPtr db)
3001
/* This flag makes the gap for the race condition
3004
* However, this posibility still remains because
3005
* we do not lock the mutex db_sw_lock here.
3007
* The reason is that it is too expensive.
3009
* In the event that the wakeup is missed the sleeper
3010
* wait will timeout eventually.
3012
if (db->db_sw_idle) {
3013
if (!xt_broadcast_cond_ns(&db->db_sw_cond))
3014
xt_log_and_clear_exception_ns();