1
/* Copyright (c) 2005 PrimeBase Technologies GmbH
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* 2005-01-03 Paul McCullagh
24
#include "xt_config.h"
33
#include <sys/resource.h>
43
#include "strutil_xt.h"
44
#include "pthread_xt.h"
45
#include "thread_xt.h"
46
#include "memory_xt.h"
47
#include "sortedlist_xt.h"
50
#include "database_xt.h"
53
//#define DEBUG_NO_ACTIVITY
56
void xt_db_exit_thread(XTThreadPtr self);
58
static void thr_accumulate_statistics(XTThreadPtr self);
60
#define XT_NO_THREAD_ID 0
61
#define XT_SYS_THREAD_ID 1
62
#define XT_MIN_THREAD_ID 2
65
* -----------------------------------------------------------------------
69
xtPublic u_int xt_thr_maximum_threads;
70
xtPublic u_int xt_thr_current_thread_count;
71
xtPublic u_int xt_thr_current_max_threads;
73
/* This structure is a double linked list of thread, with a wait
76
static XTLinkedListPtr thr_list;
78
/* This structure maps thread ID's to thread pointers. */
79
THR_ARRAY_LOCK_TYPE xt_thr_array_resize_lock;
80
xtPublic XTThreadDataRec *xt_thr_array;
81
static xt_mutex_type thr_array_lock;
83
/* Global accumulated statistics: */
84
static XTStatisticsRec thr_statistics;
87
static void break_in_assertion(c_char *expr, c_char *func, c_char *file, u_int line)
89
printf("%s(%s:%d) %s\n", func, file, (int) line, expr);
94
* -----------------------------------------------------------------------
98
void XTLockTask::tk_init(struct XTThread *self)
100
xt_init_mutex_with_autoname(self, <_mutex);
103
void XTLockTask::tk_exit()
105
xt_free_mutex(<_mutex);
109
void XTLockTask::tk_lock()
111
xt_lock_mutex_ns(<_mutex);
114
void XTLockTask::tk_unlock()
116
xt_unlock_mutex_ns(<_mutex);
120
* -----------------------------------------------------------------------
124
static xt_mutex_type log_mutex;
125
static int log_level = 0;
126
static FILE *log_file = NULL;
127
static xtBool log_newline = TRUE;
129
xtPublic xtBool xt_init_logging(void)
134
log_level = XT_LOG_TRACE;
135
err = xt_p_mutex_init_with_autoname(&log_mutex, NULL);
137
xt_log_errno(XT_NS_CONTEXT, err);
142
if (!xt_init_trace()) {
149
xtPublic void xt_exit_logging(void)
152
xt_free_mutex(&log_mutex);
158
xtPublic void xt_get_now(char *buffer, size_t len)
164
if (ticks == (time_t) -1) {
166
printf(buffer, "** error %d getting time **", errno);
168
snprintf(buffer, len, "** error %d getting time **", errno);
172
localtime_r(&ticks, <ime);
173
strftime(buffer, len, "%y%m%d %H:%M:%S", <ime);
176
static void thr_log_newline(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level)
180
char thr_name[XT_THR_NAME_SIZE+3];
182
xt_get_now(time_str, 200);
183
if (self && *self->t_name) {
184
xt_strcpy(XT_THR_NAME_SIZE+3, thr_name, " ");
185
xt_strcat(XT_THR_NAME_SIZE+3, thr_name, self->t_name);
190
case XT_LOG_FATAL: level_str = " [Fatal]"; break;
191
case XT_LOG_ERROR: level_str = " [Error]"; break;
192
case XT_LOG_WARNING: level_str = " [Warning]"; break;
193
case XT_LOG_INFO: level_str = " [Note]"; break;
194
case XT_LOG_TRACE: level_str = " [Trace]"; break;
195
default: level_str = " "; break;
197
if (func && *func && *func != '-') {
198
char func_name[XT_MAX_FUNC_NAME_SIZE];
200
xt_strcpy_term(XT_MAX_FUNC_NAME_SIZE, func_name, func, '(');
202
fprintf(log_file, "%s%s%s %s(%s:%d) ", time_str, level_str, thr_name, func_name, xt_last_name_of_path(file), line);
204
fprintf(log_file, "%s%s%s %s() ", time_str, level_str, thr_name, func_name);
208
fprintf(log_file, "%s%s%s [%s:%d] ", time_str, level_str, thr_name, xt_last_name_of_path(file), line);
210
fprintf(log_file, "%s%s%s ", time_str, level_str, thr_name);
215
/* Windows uses printf()!! */
216
#define DEFAULT_LOG_BUFFER_SIZE 2000
219
#define DEFAULT_LOG_BUFFER_SIZE 10
221
#define DEFAULT_LOG_BUFFER_SIZE 2000
225
void xt_log_flush(XTThreadPtr XT_UNUSED(self))
231
* Log the given formated string information to the log file.
232
* Before each new line, this function writes the
233
* log header, which includes the time, log level,
234
* and source file and line number (optional).
236
static void thr_log_va(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level, c_char *fmt, va_list ap)
238
char buffer[DEFAULT_LOG_BUFFER_SIZE];
239
char *log_string = NULL;
241
if (level > log_level)
244
xt_lock_mutex_ns(&log_mutex);
247
vsprintf(buffer, fmt, ap);
250
#if !defined(va_copy) || defined(XT_SOLARIS)
253
len = vsnprintf(buffer, DEFAULT_LOG_BUFFER_SIZE-1, fmt, ap);
254
if (len > DEFAULT_LOG_BUFFER_SIZE-1)
255
len = DEFAULT_LOG_BUFFER_SIZE-1;
259
/* Use the buffer, unless it is too small */
264
bufsize = vsnprintf(buffer, DEFAULT_LOG_BUFFER_SIZE, fmt, ap);
265
if (bufsize >= DEFAULT_LOG_BUFFER_SIZE) {
266
log_string = (char *) malloc(bufsize + 1);
267
if (vsnprintf(log_string, bufsize + 1, fmt, ap2) > bufsize) {
278
char *str, *str_end, tmp_ch;
283
thr_log_newline(self, func, file, line, level);
286
str_end = strchr(str, '\n');
294
str_end = str + strlen(str);
297
fprintf(log_file, "%s", str);
303
if (log_string != buffer)
307
xt_unlock_mutex_ns(&log_mutex);
310
xtPublic void xt_logf(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level, c_char *fmt, ...)
315
thr_log_va(self, func, file, line, level, fmt, ap);
319
xtPublic void xt_log(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level, c_char *string)
321
xt_logf(self, func, file, line, level, "%s", string);
324
static int thr_log_error_va(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level, int xt_err, int sys_err, c_char *fmt, va_list ap)
327
char xt_err_string[50];
331
case XT_ASSERTION_FAILURE:
332
strcpy(xt_err_string, "Assertion");
333
default_level = XT_LOG_FATAL;
335
case XT_SYSTEM_ERROR:
336
strcpy(xt_err_string, "errno");
337
default_level = XT_LOG_ERROR;
339
case XT_SIGNAL_CAUGHT:
340
strcpy(xt_err_string, "Signal");
341
default_level = XT_LOG_ERROR;
344
sprintf(xt_err_string, "%d", xt_err);
345
default_level = XT_LOG_ERROR;
348
if (level == XT_LOG_DEFAULT)
349
level = default_level;
351
if (*xt_err_string) {
353
xt_logf(self, func, file, line, level, "%s (%d): ", xt_err_string, sys_err);
355
xt_logf(self, func, file, line, level, "%s: ", xt_err_string);
357
thr_log_va(self, func, file, line, level, fmt, ap);
358
xt_logf(self, func, file, line, level, "\n");
362
/* The function returns the actual log level used. */
363
xtPublic int xt_log_errorf(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level, int xt_err, int sys_err, c_char *fmt, ...)
368
level = thr_log_error_va(self, func, file, line, level, xt_err, sys_err, fmt, ap);
373
/* The function returns the actual log level used. */
374
xtPublic int xt_log_error(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level, int xt_err, int sys_err, c_char *string)
376
return xt_log_errorf(self, func, file, line, level, xt_err, sys_err, "%s", string);
379
xtPublic void xt_log_exception(XTThreadPtr self, XTExceptionPtr e, int level)
381
ASSERT_NS(e->e_xt_err);
382
level = xt_log_error(
391
/* Dump the catch trace: */
392
if (*e->e_catch_trace)
393
xt_logf(self, NULL, NULL, 0, level, "%s", e->e_catch_trace);
396
xtPublic void xt_log_and_clear_exception(XTThreadPtr self)
398
xt_log_exception(self, &self->t_exception, XT_LOG_DEFAULT);
399
xt_clear_exception(self);
402
xtPublic void xt_log_and_clear_exception_ns(void)
404
xt_log_and_clear_exception(xt_get_self());
407
xtPublic void xt_log_and_clear_warning(XTThreadPtr self)
409
xt_log_exception(self, &self->t_exception, XT_LOG_WARNING);
410
xt_clear_exception(self);
413
xtPublic void xt_log_and_clear_warning_ns(void)
415
xt_log_and_clear_warning(xt_get_self());
419
* -----------------------------------------------------------------------
423
static void thr_add_catch_trace(XTExceptionPtr e, c_char *func, c_char *file, u_int line)
425
if (func && *func && *func != '-') {
426
xt_strcat_term(XT_CATCH_TRACE_SIZE, e->e_catch_trace, func, '(');
427
xt_strcat(XT_CATCH_TRACE_SIZE, e->e_catch_trace, "(");
430
xt_strcat(XT_CATCH_TRACE_SIZE, e->e_catch_trace, xt_last_name_of_path(file));
434
sprintf(buffer, "%u", line);
435
xt_strcat(XT_CATCH_TRACE_SIZE, e->e_catch_trace, ":");
436
xt_strcat(XT_CATCH_TRACE_SIZE, e->e_catch_trace, buffer);
439
if (func && *func && *func != '-')
440
xt_strcat(XT_CATCH_TRACE_SIZE, e->e_catch_trace, ")");
441
xt_strcat(XT_CATCH_TRACE_SIZE, e->e_catch_trace, "\n");
444
static void thr_save_error_va(XTExceptionPtr e, XTThreadPtr self, xtBool throw_it, c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *fmt, va_list ap)
451
e->e_xt_err = xt_err;
452
e->e_sys_err = sys_err;
453
vsnprintf(e->e_err_msg, XT_ERR_MSG_SIZE, fmt, ap);
455
/* Make the first character of the message upper case: */
456
/* This did not work for foreign languages! */
457
if (e->e_err_msg[0] >= 'a' && e->e_err_msg[0] <= 'z')
458
e->e_err_msg[0] = (char) toupper(e->e_err_msg[0]);
460
if (func && *func && *func != '-')
461
xt_strcpy_term(XT_MAX_FUNC_NAME_SIZE, e->e_func_name, func, '(');
465
xt_strcpy(XT_SOURCE_FILE_NAME_SIZE, e->e_source_file, xt_last_name_of_path(file));
466
e->e_source_line = line;
469
*e->e_source_file = 0;
470
e->e_source_line = 0;
472
*e->e_catch_trace = 0;
477
/* Create a stack trace for this exception: */
478
thr_add_catch_trace(e, func, file, line);
479
for (i=self->t_call_top-1; i>=0; i--)
480
thr_add_catch_trace(e, self->t_call_stack[i].cs_func, self->t_call_stack[i].cs_file, self->t_call_stack[i].cs_line);
487
* -----------------------------------------------------------------------
488
* THROWING EXCEPTIONS
491
/* If we have to allocate resources and the hold them temporarily during which
492
* time an exception could occur, then these functions provide a holding
493
* place for the data, which will be freed in the case of an exception.
495
* Note: the free functions could themselves allocated resources.
496
* to make sure all things work out we only remove the resource from
497
* then stack when it is freed.
499
static void thr_free_resources(XTThreadPtr self, XTResourcePtr top)
502
XTThreadFreeFunc free_func;
506
while (self->t_res_top > top) {
507
/* Pop the top resource: */
508
rp = (XTResourcePtr) (((char *) self->t_res_top) - self->t_res_top->r_prev_size);
510
/* Free the resource: */
511
if (rp->r_free_func) {
512
free_func = rp->r_free_func;
513
rp->r_free_func = NULL;
514
free_func(self, rp->r_data);
517
self->t_res_top = rp;
521
xtPublic void xt_bug(XTThreadPtr XT_UNUSED(self))
523
static int *bug_ptr = NULL;
529
* This function is called when an exception is caught.
530
* It restores the function call top and frees
531
* any resource allocated by lower levels.
533
xtPublic void xt_caught(XTThreadPtr self)
535
/* Restore the call top: */
536
self->t_call_top = self->t_jmp_env[self->t_jmp_depth].jb_call_top;
538
/* Free the temporary data that would otherwize be lost
539
* This should do nothing, because we actually free things on throw
542
thr_free_resources(self, self->t_jmp_env[self->t_jmp_depth].jb_res_top);
545
xtPublic void xt_enter_exception_handler(XTThreadPtr self, XTExceptionPtr e)
547
ASSERT_NS(self->t_exception.e_xt_err);
548
self->t_in_handler++;
549
*e = self->t_exception;
550
/* If freeing resources, generates an error, we will know: */
551
self->t_exception.e_xt_err = 0;
554
xtPublic void xt_exit_exception_handler(XTThreadPtr self, XTExceptionPtr e)
556
if (self->t_exception.e_xt_err)
557
xt_log_and_clear_exception(self);
558
/* Restore the exception: */
559
self->t_exception = *e;
560
self->t_in_handler--;
563
/* Throw an already registered error: */
564
xtPublic void xt_throw(XTThreadPtr self)
567
/* We should not throw an exception in an exception handler! */
568
ASSERT_NS(!self->t_in_handler);
570
/* We cannot actually handle the situation that an exception is
571
* thrown when freeing resources here!
573
if (self->t_jmp_depth > 0 && self->t_jmp_depth <= XT_MAX_JMP) {
574
/* Save the exception, in case it is overwritten during free
579
/* As recommended by Barry: free the resources before the stack is invalid! */
580
xt_enter_exception_handler(self, &e);
581
thr_free_resources(self, self->t_jmp_env[self->t_jmp_depth-1].jb_res_top);
582
xt_exit_exception_handler(self, &e);
584
/* Then do the longjmp: */
585
if (!self->t_in_handler)
586
longjmp(self->t_jmp_env[self->t_jmp_depth-1].jb_buffer, 1);
591
* We cannot throw an error, because it will not be caught.
592
* This means there is no try ... catch block above.
593
* In this case, we just return.
594
* The calling functions must handle errors...
596
xt_log(XT_CONTEXT, XT_LOG_FATAL, "Uncaught exception\n");
597
xt_exit_thread(self, NULL);
601
xtPublic void xt_throwf(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *fmt, ...)
604
XTThreadPtr thread = self ? self : xt_get_self();
607
thr_save_error_va(thread ? &thread->t_exception : NULL, thread, self ? TRUE : FALSE, func, file, line, xt_err, sys_err, fmt, ap);
611
xtPublic void xt_throw_error(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *msg)
613
xt_throwf(self, func, file, line, xt_err, sys_err, "%s", msg);
616
#define XT_SYS_ERR_SIZE 300
619
static c_char *thr_get_sys_error(int err, char *err_msg)
621
static c_char *thr_get_sys_error(int err, char *XT_UNUSED(err_msg))
627
if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL,
628
err, 0, err_msg, XT_SYS_ERR_SIZE, NULL)) {
629
return strerror(err);
632
ptr = &err_msg[strlen(err_msg)];
633
while (ptr-1 > err_msg) {
634
if (*(ptr-1) != '\n' && *(ptr-1) != '\r' && *(ptr-1) != '.')
641
return strerror(err);
645
static c_char *thr_get_err_string(int xt_err)
650
case XT_ERR_STACK_OVERFLOW: str = "Stack overflow"; break;
651
case XT_ERR_JUMP_OVERFLOW: str = "Jump overflow"; break;
652
case XT_ERR_TABLE_EXISTS: str = "Table `%s` already exists"; break;
653
case XT_ERR_NAME_TOO_LONG: str = "Name '%s' is too long"; break;
654
case XT_ERR_TABLE_NOT_FOUND: str = "Table `%s` not found"; break;
655
case XT_ERR_SESSION_NOT_FOUND: str = "Session %s not found"; break;
656
case XT_ERR_BAD_ADDRESS: str = "Incorrect address '%s'"; break;
657
case XT_ERR_UNKNOWN_SERVICE: str = "Unknown service '%s'"; break;
658
case XT_ERR_UNKNOWN_HOST: str = "Host '%s' not found"; break;
659
case XT_ERR_TOKEN_EXPECTED: str = "%s expected in place of %s"; break;
660
case XT_ERR_PROPERTY_REQUIRED: str = "Property '%s' required"; break;
661
case XT_ERR_DEADLOCK: str = "Deadlock, transaction aborted"; break;
662
case XT_ERR_CANNOT_CHANGE_DB: str = "Cannot change database while transaction is in progress"; break;
663
case XT_ERR_ILLEGAL_CHAR: str = "Illegal character: '%s'"; break;
664
case XT_ERR_UNTERMINATED_STRING:str = "Unterminated string: %s"; break;
665
case XT_ERR_SYNTAX: str = "Syntax error near %s"; break;
666
case XT_ERR_ILLEGAL_INSTRUCTION:str = "Illegal instruction"; break;
667
case XT_ERR_OUT_OF_BOUNDS: str = "Memory reference out of bounds"; break;
668
case XT_ERR_STACK_UNDERFLOW: str = "Stack underflow"; break;
669
case XT_ERR_TYPE_MISMATCH: str = "Type mismatch"; break;
670
case XT_ERR_ILLEGAL_TYPE: str = "Illegal type for operator"; break;
671
case XT_ERR_ID_TOO_LONG: str = "Identifier too long: %s"; break;
672
case XT_ERR_TYPE_OVERFLOW: str = "Type overflow: %s"; break;
673
case XT_ERR_TABLE_IN_USE: str = "Table `%s` in use"; break;
674
case XT_ERR_NO_DATABASE_IN_USE: str = "No database in use"; break;
675
case XT_ERR_CANNOT_RESOLVE_TYPE:str = "Cannot resolve type with ID: %s"; break;
676
case XT_ERR_BAD_INDEX_DESC: str = "Unsupported index description: %s"; break;
677
case XT_ERR_WRONG_NO_OF_VALUES: str = "Incorrect number of values"; break;
678
case XT_ERR_CANNOT_OUTPUT_VALUE:str = "Cannot output given type"; break;
679
case XT_ERR_COLUMN_NOT_FOUND: str = "Column `%s.%s` not found"; break;
680
case XT_ERR_NOT_IMPLEMENTED: str = "Not implemented"; break;
681
case XT_ERR_UNEXPECTED_EOS: str = "Connection unexpectedly lost"; break;
682
case XT_ERR_BAD_TOKEN: str = "Incorrect binary token"; break;
683
case XT_ERR_RES_STACK_OVERFLOW: str = "Internal error: resource stack overflow"; break;
684
case XT_ERR_BAD_INDEX_TYPE: str = "Unsupported index type: %s"; break;
685
case XT_ERR_INDEX_EXISTS: str = "Index '%s' already exists"; break;
686
case XT_ERR_INDEX_STRUC_EXISTS: str = "Index '%s' has an identical structure"; break;
687
case XT_ERR_INDEX_NOT_FOUND: str = "Index '%s' not found"; break;
688
case XT_ERR_INDEX_CORRUPT: str = "Cannot read index '%s'"; break;
689
case XT_ERR_TYPE_NOT_SUPPORTED: str = "Data type %s not supported"; break;
690
case XT_ERR_BAD_TABLE_VERSION: str = "Table `%s` version not supported, upgrade required"; break;
691
case XT_ERR_BAD_RECORD_FORMAT: str = "Record format unknown, either corrupted or upgrade required"; break;
692
case XT_ERR_BAD_EXT_RECORD: str = "Extended record part does not match reference"; break;
693
case XT_ERR_RECORD_CHANGED: str = "Record already updated, transaction aborted"; break;
694
case XT_ERR_XLOG_WAS_CORRUPTED: str = "Corrupted transaction log has been truncated"; break;
695
case XT_ERR_DUPLICATE_KEY: str = "Duplicate unique key"; break;
696
case XT_ERR_NO_DICTIONARY: str = "Table `%s` has not yet been opened by MySQL"; break;
697
case XT_ERR_TOO_MANY_TABLES: str = "Limit of %s tables per database exceeded"; break;
698
case XT_ERR_KEY_TOO_LARGE: str = "Index '%s' exceeds the key size limit of %s"; break;
699
case XT_ERR_MULTIPLE_DATABASES: str = "Multiple database in a single transaction is not permitted"; break;
700
case XT_ERR_NO_TRANSACTION: str = "Internal error: no transaction running"; break;
701
case XT_ERR_A_EXPECTED_NOT_B: str = "%s expected in place of %s"; break;
702
case XT_ERR_NO_MATCHING_INDEX: str = "Matching index required for '%s'"; break;
703
case XT_ERR_TABLE_LOCKED: str = "Table `%s` locked"; break;
704
case XT_ERR_NO_REFERENCED_ROW: str = "Constraint: `%s`"; break; // "Foreign key '%s', referenced row does not exist"
705
case XT_ERR_ROW_IS_REFERENCED: str = "Constraint: `%s`"; break; // "Foreign key '%s', has a referencing row"
706
case XT_ERR_BAD_DICTIONARY: str = "Internal dictionary does not match MySQL dictionary"; break;
707
case XT_ERR_LOADING_MYSQL_DIC: str = "Error loading %s.frm file, MySQL error: %s"; break;
708
case XT_ERR_COLUMN_IS_NOT_NULL: str = "Column `%s` is NOT NULL"; break;
709
case XT_ERR_INCORRECT_NO_OF_COLS: str = "Incorrect number of columns near %s"; break;
710
case XT_ERR_FK_ON_TEMP_TABLE: str = "Cannot create foreign key on temporary table"; break;
711
case XT_ERR_REF_TABLE_NOT_FOUND: str = "Referenced table `%s` not found"; break;
712
case XT_ERR_REF_TYPE_WRONG: str = "Incorrect data type on referenced column `%s`"; break;
713
case XT_ERR_DUPLICATE_FKEY: str = "Duplicate unique foreign key, contraint: %s"; break;
714
case XT_ERR_INDEX_FILE_TO_LARGE: str = "Index file has grown too large: %s"; break;
715
case XT_ERR_UPGRADE_TABLE: str = "Table `%s` must be upgraded from PBXT version %s"; break;
716
case XT_ERR_INDEX_NEW_VERSION: str = "Table `%s` index created by a newer version, upgrade required"; break;
717
case XT_ERR_LOCK_TIMEOUT: str = "Lock timeout on table `%s`"; break;
718
case XT_ERR_CONVERSION: str = "Error converting value for column `%s.%s`"; break;
719
case XT_ERR_NO_ROWS: str = "No matching row found in table `%s`"; break;
720
case XT_ERR_DATA_LOG_NOT_FOUND: str = "Data log not found: '%s'"; break;
721
case XT_ERR_LOG_MAX_EXCEEDED: str = "Maximum log count, %s, exceeded"; break;
722
case XT_ERR_MAX_ROW_COUNT: str = "Maximum row count reached"; break;
723
case XT_ERR_FILE_TOO_LONG: str = "File cannot be mapped, too large: '%s'"; break;
724
case XT_ERR_BAD_IND_BLOCK_SIZE: str = "Table `%s`, incorrect index block size: %s"; break;
725
case XT_ERR_INDEX_CORRUPTED: str = "Table `%s` index is corrupted, REPAIR TABLE required"; break;
726
case XT_ERR_NO_INDEX_CACHE: str = "Not enough index cache memory to handle concurrent updates"; break;
727
case XT_ERR_INDEX_LOG_CORRUPT: str = "Index log corrupt: '%s'"; break;
728
case XT_ERR_TOO_MANY_THREADS: str = "Too many threads: %s, increase pbxt_max_threads"; break;
729
case XT_ERR_TOO_MANY_WAITERS: str = "Too many waiting threads: %s"; break;
730
case XT_ERR_INDEX_OLD_VERSION: str = "Table `%s` index created by an older version, REPAIR TABLE required"; break;
731
case XT_ERR_PBXT_TABLE_EXISTS: str = "System table cannot be dropped because PBXT table still exists"; break;
732
case XT_ERR_SERVER_RUNNING: str = "A server is possibly already running"; break;
733
case XT_ERR_INDEX_MISSING: str = "Index file of table '%s' is missing"; break;
734
case XT_ERR_RECORD_DELETED: str = "Record was deleted"; break;
735
case XT_ERR_NEW_TYPE_OF_XLOG: str = "Transaction log %s, is using a newer format, upgrade required"; break;
736
case XT_ERR_NO_BEFORE_IMAGE: str = "Internal error: no before image"; break;
737
case XT_ERR_FK_REF_TEMP_TABLE: str = "Foreign key may not reference temporary table"; break;
738
case XT_ERR_FILE_OP_NOT_SUPP: str = "File operation not supported on this file: %s"; break;
739
case XT_ERR_INDEX_NOT_RECOVERED: str = "Index of table '%s' cannot be used because the table was not recovered"; break;
740
case XT_ERR_MYSQL_SHUTDOWN: str = "Cannot open table, MySQL has shutdown"; break;
741
case XT_ERR_MYSQL_NO_THREAD: str = "Cannot create thread, MySQL has shutdown"; break;
742
case XT_ERR_BUFFER_TOO_SMALL: str = "System backup buffer too small"; break;
743
case XT_ERR_BAD_BACKUP_FORMAT: str = "Unknown or corrupt backup format, restore aborted"; break;
744
case XT_ERR_PBXT_NOT_INSTALLED: str = "PBXT plugin is not installed"; break;
745
case XT_ERR_LOG_HEADER_CORRUPT: str = "Transaction log %s, file header corrupt"; break;
746
default: str = "Unknown XT error"; break;
751
xtPublic void xt_throw_i2xterr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, c_char *item, c_char *item2)
753
xt_throwf(self, func, file, line, xt_err, 0, thr_get_err_string(xt_err), item, item2);
756
xtPublic void xt_throw_ixterr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, c_char *item)
758
xt_throw_i2xterr(self, func, file, line, xt_err, item, NULL);
761
xtPublic void xt_throw_tabcolerr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, XTPathStrPtr tab_item, c_char *item2)
763
char buffer[XT_TABLE_NAME_BUF_SIZE];
765
xt_tab_make_table_name(tab_item, buffer, sizeof(buffer));
766
xt_throw_i2xterr(self, func, file, line, xt_err, buffer, item2);
769
xtPublic void xt_throw_taberr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, XTPathStrPtr tab_item)
771
char buffer[XT_TABLE_NAME_BUF_SIZE];
773
xt_tab_make_table_name(tab_item, buffer, sizeof(buffer));
774
xt_throw_ixterr(self, func, file, line, xt_err, buffer);
777
xtPublic void xt_throw_ulxterr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, u_long value)
781
sprintf(buffer, "%lu", value);
782
xt_throw_ixterr(self, func, file, line, xt_err, buffer);
785
xtPublic void xt_throw_sulxterr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, c_char *item, u_long value)
789
sprintf(buffer, "%lu", value);
790
xt_throw_i2xterr(self, func, file, line, xt_err, item, buffer);
793
xtPublic void xt_throw_xterr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err)
795
xt_throw_ixterr(self, func, file, line, xt_err, NULL);
798
xtPublic void xt_throw_errno(XTThreadPtr self, c_char *func, c_char *file, u_int line, int err)
800
char err_msg[XT_SYS_ERR_SIZE];
802
xt_throw_error(self, func, file, line, XT_SYSTEM_ERROR, err, thr_get_sys_error(err, err_msg));
805
xtPublic void xt_throw_ferrno(XTThreadPtr self, c_char *func, c_char *file, u_int line, int err, c_char *path)
807
char err_msg[XT_SYS_ERR_SIZE];
809
xt_throwf(self, func, file, line, XT_SYSTEM_ERROR, err, "%s: '%s'", thr_get_sys_error(err, err_msg), path);
812
xtPublic void xt_throw_assertion(XTThreadPtr self, c_char *func, c_char *file, u_int line, c_char *str)
814
xt_throw_error(self, func, file, line, XT_ASSERTION_FAILURE, 0, str);
817
static void xt_log_assertion(XTThreadPtr self, c_char *func, c_char *file, u_int line, c_char *str)
819
xt_log_error(self, func, file, line, XT_LOG_DEFAULT, XT_ASSERTION_FAILURE, 0, str);
822
xtPublic void xt_throw_signal(XTThreadPtr self, c_char *func, c_char *file, u_int line, int sig)
827
sprintf(buffer, "Signal #%d", sig);
828
xt_throw_error(self, func, file, line, XT_SIGNAL_CAUGHT, sig, buffer);
830
xt_throw_error(self, func, file, line, XT_SIGNAL_CAUGHT, sig, strsignal(sig));
835
* -----------------------------------------------------------------------
836
* REGISTERING EXCEPTIONS
839
xtPublic void xt_registerf(c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *fmt, ...)
842
XTThreadPtr thread = xt_get_self();
845
thr_save_error_va(thread ? &thread->t_exception : NULL, thread, FALSE, func, file, line, xt_err, sys_err, fmt, ap);
849
xtPublic void xt_register_i2xterr(c_char *func, c_char *file, u_int line, int xt_err, c_char *item, c_char *item2)
851
xt_registerf(func, file, line, xt_err, 0, thr_get_err_string(xt_err), item, item2);
854
xtPublic void xt_register_ixterr(c_char *func, c_char *file, u_int line, int xt_err, c_char *item)
856
xt_register_i2xterr(func, file, line, xt_err, item, NULL);
859
xtPublic void xt_register_tabcolerr(c_char *func, c_char *file, u_int line, int xt_err, XTPathStrPtr tab_item, c_char *item2)
861
char buffer[XT_TABLE_NAME_BUF_SIZE];
863
xt_tab_make_table_name(tab_item, buffer, sizeof(buffer));
864
xt_register_i2xterr(func, file, line, xt_err, buffer, item2);
867
xtPublic void xt_register_taberr(c_char *func, c_char *file, u_int line, int xt_err, XTPathStrPtr tab_item)
869
char buffer[XT_TABLE_NAME_BUF_SIZE];
871
xt_tab_make_table_name(tab_item, buffer, sizeof(buffer));
872
xt_register_ixterr(func, file, line, xt_err, buffer);
875
xtPublic void xt_register_ulxterr(c_char *func, c_char *file, u_int line, int xt_err, u_long value)
879
sprintf(buffer, "%lu", value);
880
xt_register_ixterr(func, file, line, xt_err, buffer);
883
xtPublic xtBool xt_register_ferrno(c_char *func, c_char *file, u_int line, int err, c_char *path)
885
char err_msg[XT_SYS_ERR_SIZE];
887
xt_registerf(func, file, line, XT_SYSTEM_ERROR, err, "%s: '%s'", thr_get_sys_error(err, err_msg), path);
891
xtPublic void xt_register_error(c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *msg)
893
xt_registerf(func, file, line, xt_err, sys_err, "%s", msg);
896
xtPublic xtBool xt_register_errno(c_char *func, c_char *file, u_int line, int err)
898
char err_msg[XT_SYS_ERR_SIZE];
900
xt_register_error(func, file, line, XT_SYSTEM_ERROR, err, thr_get_sys_error(err, err_msg));
904
xtPublic void xt_register_xterr(c_char *func, c_char *file, u_int line, int xt_err)
906
xt_register_error(func, file, line, xt_err, 0, thr_get_err_string(xt_err));
910
* -----------------------------------------------------------------------
911
* CREATING EXCEPTIONS
914
xtPublic void xt_exceptionf(XTExceptionPtr e, XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *fmt, ...)
919
thr_save_error_va(e, self, FALSE, func, file, line, xt_err, sys_err, fmt, ap);
923
xtPublic void xt_exception_error(XTExceptionPtr e, XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *msg)
925
xt_exceptionf(e, self, func, file, line, xt_err, sys_err, "%s", msg);
928
xtPublic xtBool xt_exception_errno(XTExceptionPtr e, XTThreadPtr self, c_char *func, c_char *file, u_int line, int err)
930
char err_msg[XT_SYS_ERR_SIZE];
932
xt_exception_error(e, self, func, file, line, XT_SYSTEM_ERROR, err, thr_get_sys_error(err, err_msg));
936
xtPublic void xt_exception_xterr(XTExceptionPtr e, XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err)
938
xt_exception_error(e, self, func, file, line, xt_err, 0, thr_get_err_string(xt_err));
942
* -----------------------------------------------------------------------
946
xtPublic void xt_log_errno(XTThreadPtr self, c_char *func, c_char *file, u_int line, int err)
950
xt_exception_errno(&e, self, func, file, line, err);
951
xt_log_exception(self, &e, XT_LOG_DEFAULT);
955
* -----------------------------------------------------------------------
956
* Assertions and failures (one breakpoints for all failures)
958
//#define CRASH_ON_ASSERT
960
xtPublic xtBool xt_assert(XTThreadPtr self, c_char *expr, c_char *func, c_char *file, u_int line)
964
//xt_set_fflush(TRUE);
966
break_in_assertion(expr, func, file, line);
967
#ifdef CRASH_ON_ASSERT
971
FatalAppExit(0, "Assertion Failed!");
974
xt_throw_assertion(self, func, file, line, expr);
979
xtPublic xtBool xt_assume(XTThreadPtr self, c_char *expr, c_char *func, c_char *file, u_int line)
981
xt_log_assertion(self, func, file, line, expr);
986
* -----------------------------------------------------------------------
987
* Create and destroy threads
990
typedef struct ThreadData {
993
void *(*td_start_routine)(XTThreadPtr self);
994
} ThreadDataRec, *ThreadDataPtr;
997
pthread_key(void *, thr_key);
999
static pthread_key_t thr_key;
1002
#ifdef HANDLE_SIGNALS
1003
static void thr_ignore_signal(int sig)
1008
static void thr_throw_signal(int sig)
1012
self = xt_get_self();
1015
/* The main thread will pass on a signal to all threads: */
1016
xt_signal_all_threads(self, sig);
1017
if (sig != SIGTERM) {
1018
if (self->t_disable_interrupts) {
1019
self->t_delayed_signal = sig;
1020
self->t_disable_interrupts = FALSE; /* Prevent infinite loop */
1023
self->t_delayed_signal = 0;
1024
xt_throw_signal(self, "thr_throw_signal", NULL, 0, sig);
1029
if (self->t_disable_interrupts) {
1030
self->t_delayed_signal = sig;
1031
self->t_disable_interrupts = FALSE; /* Prevent infinite loop */
1034
self->t_delayed_signal = 0;
1035
xt_throw_signal(self, "thr_throw_signal", NULL, 0, sig);
1040
static xtBool thr_setup_signals(void)
1042
struct sigaction action;
1044
sigemptyset(&action.sa_mask);
1045
action.sa_flags = 0;
1046
action.sa_handler = thr_ignore_signal;
1048
if (sigaction(SIGPIPE, &action, NULL) == -1)
1049
goto error_occurred;
1050
if (sigaction(SIGHUP, &action, NULL) == -1)
1051
goto error_occurred;
1053
action.sa_handler = thr_throw_signal;
1055
if (sigaction(SIGQUIT, &action, NULL) == -1)
1056
goto error_occurred;
1057
if (sigaction(SIGTERM, &action, NULL) == -1)
1058
goto error_occurred;
1060
if (sigaction(SIGILL, &action, NULL) == -1)
1061
goto error_occurred;
1062
if (sigaction(SIGBUS, &action, NULL) == -1)
1063
goto error_occurred;
1064
if (sigaction(SIGSEGV, &action, NULL) == -1)
1065
goto error_occurred;
1070
xt_log_errno(XT_NS_CONTEXT, errno);
1075
typedef void *(*ThreadMainFunc)(XTThreadPtr self);
1077
extern "C" void *xt_thread_main(void *data)
1079
ThreadDataPtr td = (ThreadDataPtr) data;
1080
XTThreadPtr self = td->td_thr;
1081
ThreadMainFunc start_routine;
1085
self->t_pthread = pthread_self();
1086
start_routine = td->td_start_routine;
1089
#ifdef HANDLE_SIGNALS
1090
if (!thr_setup_signals())
1095
if (!xt_set_key((pthread_key_t)thr_key, self, &self->t_exception))
1097
td->td_started = TRUE;
1098
return_data = (*start_routine)(self);
1101
xt_log_and_clear_exception(self);
1106
xt_free_thread(self);
1108
/* {MYSQL-THREAD-KILL}
1109
* Clean up any remaining MySQL thread!
1111
myxt_delete_remaining_thread();
1115
static void thr_free_data(XTThreadPtr self)
1117
if (self->t_free_data) {
1118
(*self->t_free_data)(self, self->t_data);
1119
self->t_data = NULL;
1123
xtPublic void xt_set_thread_data(XTThreadPtr self, void *data, XTThreadFreeFunc free_func)
1125
thr_free_data(self);
1126
self->t_free_data = free_func;
1127
self->t_data = data;
1130
static void thr_exit(XTThreadPtr self)
1132
/* Free the thread temporary data. */
1133
thr_free_resources(self, (XTResourcePtr) self->x.t_res_stack);
1134
xt_db_exit_thread(self);
1135
thr_free_data(self); /* Free custom user data. */
1137
if (self->t_id > 0) {
1138
ASSERT(self->t_id < xt_thr_current_max_threads);
1139
xt_lock_mutex(self, &thr_array_lock);
1140
pushr_(xt_unlock_mutex, &thr_array_lock);
1141
thr_accumulate_statistics(self);
1142
// No read resize lock required if I have the array lock
1143
xt_thr_array[self->t_id].td_thread = NULL;
1144
xt_thr_current_thread_count--;
1145
if (self->t_id+1 == xt_thr_current_max_threads) {
1146
/* We can reduce the current maximum,
1147
* this makes operations that scan the array faster!
1153
if (xt_thr_array[i].td_thread)
1159
xt_thr_current_max_threads = i+1;
1161
freer_(); // xt_unlock_mutex(&thr_array_lock)
1164
xt_free_cond(&self->t_cond);
1165
xt_free_mutex(&self->t_lock);
1167
self->st_tasks_todo.pl_exit();
1168
self->st_tasks_done.pl_exit();
1170
self->st_thread_list_count = 0;
1171
self->st_thread_list_size = 0;
1172
if (self->st_thread_list) {
1173
xt_free_ns(self->st_thread_list);
1174
self->st_thread_list = NULL;
1179
#define XT_THREAD_ARRAY_INC_SIZE 5
1181
#define XT_THREAD_ARRAY_INC_SIZE 100
1184
static xtBool thr_init_ns(XTThreadPtr new_thread)
1186
new_thread->t_res_top = (XTResourcePtr) new_thread->x.t_res_stack;
1188
new_thread->st_thread_list_count = 0;
1189
new_thread->st_thread_list_size = 0;
1190
new_thread->st_thread_list = NULL;
1192
if (!new_thread->st_tasks_todo.pl_init_ns())
1195
if (!new_thread->st_tasks_done.pl_init_ns())
1198
xt_init_cond(NULL, &new_thread->t_cond);
1199
xt_init_mutex_with_autoname(NULL, &new_thread->t_lock);
1201
xt_lock_mutex_ns(&thr_array_lock);
1203
ASSERT_NS(xt_thr_current_thread_count <= xt_thr_current_max_threads);
1204
ASSERT_NS(xt_thr_current_max_threads <= xt_thr_maximum_threads);
1205
if (xt_thr_current_thread_count == xt_thr_maximum_threads) {
1206
XTThreadDataRec *tmp;
1208
/* We can use a fixed thread ID because only one thread can do
1209
* this at any given time.
1211
* We use XT_SYS_THREAD_ID to avoid the problem that we this function
1212
* allocates thread IDs, but needs a thread ID to aquire the
1215
THR_ARRAY_WRITE_LOCK(&xt_thr_array_resize_lock, XT_SYS_THREAD_ID);
1216
if (!(tmp = (XTThreadDataRec *) realloc(xt_thr_array, (xt_thr_maximum_threads + XT_THREAD_ARRAY_INC_SIZE) * sizeof(XTThreadDataRec)))) {
1217
THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, XT_SYS_THREAD_ID);
1222
xt_thr_maximum_threads += XT_THREAD_ARRAY_INC_SIZE;
1224
for (u_int i=xt_thr_maximum_threads - XT_THREAD_ARRAY_INC_SIZE; i<xt_thr_maximum_threads; i++)
1225
xt_thr_array[i].td_thread = NULL;
1227
for (u_int i=xt_thr_maximum_threads - XT_THREAD_ARRAY_INC_SIZE; i<xt_thr_maximum_threads; i++) {
1228
if (!(xt_thr_array[i].td_waiting = (XTWaitThreadPtr) xt_calloc_ns(sizeof(XTWaitThreadRec)))) {
1229
xt_thr_maximum_threads = i;
1230
THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, XT_SYS_THREAD_ID);
1233
xt_init_mutex_with_autoname(NULL, &xt_thr_array[i].td_waiting->wt_lock);
1234
xt_init_cond(NULL, &xt_thr_array[i].td_waiting->wt_cond);
1235
xt_spinlock_init_with_autoname(NULL, &xt_thr_array[i].td_waiting->wt_wait_list_lock);
1238
THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, XT_SYS_THREAD_ID);
1241
if (xt_thr_current_thread_count == xt_thr_current_max_threads) {
1242
new_thread->t_id = xt_thr_current_thread_count;
1243
xt_thr_array[new_thread->t_id].td_thread = new_thread;
1244
xt_thr_current_max_threads++;
1247
/* There must be a free slot: */
1248
for (u_int i=0; i<xt_thr_current_max_threads; i++) {
1249
if (!xt_thr_array[i].td_thread) {
1250
new_thread->t_id = i;
1251
xt_thr_array[i].td_thread = new_thread;
1256
xt_thr_current_thread_count++;
1258
xt_unlock_mutex_ns(&thr_array_lock);
1260
xt_db_init_thread_ns(new_thread);
1264
xt_unlock_mutex_ns(&thr_array_lock);
1267
thr_exit(new_thread);
1272
* The caller of this function automatically becomes the main thread.
1274
xtPublic XTThreadPtr xt_init_threading()
1276
volatile XTThreadPtr self = NULL;
1280
#ifdef HANDLE_SIGNALS
1281
if (!thr_setup_signals())
1285
xt_p_init_threading();
1287
err = pthread_key_create(&thr_key, NULL);
1289
xt_log_errno(XT_NS_CONTEXT, err);
1293
if ((err = xt_p_mutex_init_with_autoname(&thr_array_lock, NULL))) {
1294
xt_log_errno(XT_NS_CONTEXT, err);
1298
xt_thr_maximum_threads = 2;
1299
if (!(xt_thr_array = (XTThreadDataRec *) malloc(xt_thr_maximum_threads * sizeof(XTThreadDataRec)))) {
1300
xt_log_errno(XT_NS_CONTEXT, XT_ENOMEM);
1304
xt_thr_array[XT_NO_THREAD_ID].td_thread = (XTThreadPtr) 1; // Dummy, not used
1305
xt_thr_array[XT_NO_THREAD_ID].td_waiting = NULL;
1306
xt_thr_array[XT_SYS_THREAD_ID].td_thread = (XTThreadPtr) 1; // System ID, used for resizing the thread array
1307
xt_thr_array[XT_SYS_THREAD_ID].td_waiting = NULL;
1308
xt_thr_current_thread_count = XT_MIN_THREAD_ID;
1309
xt_thr_current_max_threads = XT_MIN_THREAD_ID;
1311
THR_ARRAY_INIT_LOCK(NULL, &xt_thr_array_resize_lock);
1313
/* Create the main thread: */
1314
self = xt_create_thread("MainThread", TRUE, FALSE, &e);
1316
xt_log_exception(NULL, &e, XT_LOG_DEFAULT);
1321
XTThreadPtr thread = self;
1322
thr_list = xt_new_linkedlist(thread, NULL, NULL, TRUE);
1326
XTThreadPtr thread = self;
1327
xt_log_and_clear_exception(thread);
1328
xt_exit_threading(thread);
1335
xt_exit_threading(NULL);
1339
xtPublic void xt_exit_threading(XTThreadPtr self)
1342
xt_free_linkedlist(self, thr_list);
1346
/* This should be the main thread! */
1348
ASSERT(self->t_main);
1349
xt_free_thread(self);
1353
for (u_int i=XT_MIN_THREAD_ID; i<xt_thr_maximum_threads; i++) {
1354
xt_free_mutex(&xt_thr_array[i].td_waiting->wt_lock);
1355
xt_free_cond(&xt_thr_array[i].td_waiting->wt_cond);
1356
xt_spinlock_free(self, &xt_thr_array[i].td_waiting->wt_wait_list_lock);
1357
if (xt_thr_array[i].td_waiting->wt_wait_list)
1358
xt_free_ns(xt_thr_array[i].td_waiting->wt_wait_list);
1359
xt_free_ns(xt_thr_array[i].td_waiting);
1363
xt_thr_array = NULL;
1364
THR_ARRAY_FREE_LOCK(self, &xt_thr_array_resize_lock);
1365
xt_free_mutex(&thr_array_lock);
1368
xt_thr_current_thread_count = 0;
1369
xt_thr_current_max_threads = 0;
1371
/* I no longer delete 'thr_key' because
1372
* functions that call xt_get_self() after this
1373
* point will get junk back if we delete
1374
* thr_key. In particular the XT_THREAD_LOCK_INFO
1377
pthread_key_delete(thr_key);
1378
thr_key = (pthread_key_t) 0;
1383
xtPublic void xt_wait_for_all_threads(XTThreadPtr self)
1386
xt_ll_wait_till_empty(self, thr_list);
1390
* Call this function in a busy wait loop!
1391
* Use if for wait loops that are not
1394
xtPublic void xt_busy_wait(void)
1403
xtPublic void xt_critical_wait(void)
1405
/* NOTE: On Mac xt_busy_wait() works better than xt_yield()
1407
#if defined(XT_MAC) || defined(XT_WIN)
1416
* Use this for loops that time critical.
1417
* Time critical means we need to get going
1418
* as soon as possible!
1420
xtPublic void xt_yield(void)
1424
#elif defined(XT_MAC) || defined(XT_SOLARIS)
1426
#elif defined(XT_NETBSD)
1433
xtPublic void xt_sleep_milli_second(u_int t)
1442
xtPublic void xt_signal_all_threads(XTThreadPtr self, int sig)
1445
XTThreadPtr sig_thr;
1447
xt_ll_lock(self, thr_list);
1449
li = thr_list->ll_items;
1451
sig_thr = (XTThreadPtr) li;
1452
if (sig_thr != self)
1453
pthread_kill(sig_thr->t_pthread, sig);
1458
xt_ll_unlock(self, thr_list);
1462
xt_ll_unlock(self, thr_list);
1466
* Apply the given function to all threads except self!
1468
xtPublic void xt_do_to_all_threads(XTThreadPtr self, void (*do_func_ptr)(XTThreadPtr self, XTThreadPtr to_thr, void *thunk), void *thunk)
1473
xt_ll_lock(self, thr_list);
1474
pushr_(xt_ll_unlock, thr_list);
1476
li = thr_list->ll_items;
1478
to_thr = (XTThreadPtr) li;
1480
(*do_func_ptr)(self, to_thr, thunk);
1484
freer_(); // xt_ll_unlock(thr_list)
1487
xtPublic XTThreadPtr xt_get_self(void)
1491
/* First check if the handler has the data: */
1492
if ((self = myxt_get_self()))
1494
/* Then it must be a background process, and the
1495
* thread info is stored in the local key: */
1496
return (XTThreadPtr) xt_get_key((pthread_key_t)thr_key);
1499
xtPublic void xt_set_self(XTThreadPtr self)
1501
xt_set_key((pthread_key_t)thr_key, self, NULL);
1504
xtPublic void xt_clear_exception(XTThreadPtr thread)
1506
thread->t_exception.e_xt_err = 0;
1507
thread->t_exception.e_sys_err = 0;
1508
*thread->t_exception.e_err_msg = 0;
1509
*thread->t_exception.e_func_name = 0;
1510
*thread->t_exception.e_source_file = 0;
1511
thread->t_exception.e_source_line = 0;
1512
*thread->t_exception.e_catch_trace = 0;
1516
* Create a thread without requiring thread to do it (as in xt_create_daemon()).
1518
* This function returns NULL on error.
1520
xtPublic XTThreadPtr xt_create_thread(c_char *name, xtBool main_thread, xtBool user_thread, XTExceptionPtr e)
1522
volatile XTThreadPtr self;
1524
self = (XTThreadPtr) xt_calloc_ns(sizeof(XTThreadRec));
1526
xt_exception_errno(e, XT_CONTEXT, ENOMEM);
1530
if (!xt_set_key((pthread_key_t)thr_key, self, e)) {
1535
xt_strcpy(XT_THR_NAME_SIZE, self->t_name, name);
1536
self->t_main = main_thread;
1537
self->t_daemon = FALSE;
1539
if (!thr_init_ns(self)) {
1540
*e = self->t_exception;
1541
xt_set_key((pthread_key_t)thr_key, NULL, NULL);
1546
if (self && user_thread) {
1547
/* Add non-temporary threads to the thread list. */
1549
xt_ll_add(self, thr_list, &self->t_links, TRUE);
1552
*e = self->t_exception;
1553
xt_free_thread(self);
1563
* Create a daemon thread.
1565
xtPublic XTThreadPtr xt_create_daemon_ns(c_char *name)
1567
XTThreadPtr new_thread;
1569
/* NOTE: thr_key will be set when this thread start running. */
1570
if (!(new_thread = (XTThreadPtr) xt_calloc_ns(sizeof(XTThreadRec))))
1573
xt_strcpy(XT_THR_NAME_SIZE, new_thread->t_name, name);
1574
new_thread->t_main = FALSE;
1575
new_thread->t_daemon = TRUE;
1577
if (!thr_init_ns(new_thread)) {
1578
xt_free_ns(new_thread);
1585
xtPublic XTThreadPtr xt_create_daemon(XTThreadPtr self, c_char *name)
1587
XTThreadPtr new_thread;
1589
if (!(new_thread = xt_create_daemon_ns(name)))
1594
void xt_free_thread(XTThreadPtr self)
1597
if (!self->t_daemon && thr_list)
1598
xt_ll_remove(self, thr_list, &self->t_links, TRUE);
1599
/* Note, if I move this before thr_exit() then self = xt_get_self(); will fail in
1600
* xt_close_file_ns() which is called by xt_unuse_database()!
1604
* Do not clear the pthread's key value unless it is the same as the thread just released.
1605
* This can happen during shutdown when the engine is deregistered with the PBMS engine.
1607
* What happens is that during deregistration the PBMS engine calls close to close all
1608
* PBXT resources on all MySQL THDs created by PBMS for it's own pthreads. So the 'self'
1609
* being freed is not the same 'self' associated with the PBXT 'thr_key'.
1611
if (thr_key && (self == ((XTThreadPtr) xt_get_key((pthread_key_t)thr_key)))) {
1612
xt_set_key((pthread_key_t)thr_key, NULL, NULL);
1617
xtPublic xtBool xt_run_thread_ns(XTThreadPtr child, void *(*start_routine)(XTThreadPtr))
1621
pthread_t child_thread;
1623
// 'data' can be on the stack because we are waiting for the thread to start
1624
// before exiting the function.
1625
data.td_started = FALSE;
1626
data.td_thr = child;
1627
data.td_start_routine = start_routine;
1630
pthread_attr_t attr = { 0, 0, 0 };
1632
attr.priority = THREAD_PRIORITY_NORMAL;
1633
err = pthread_create(&child_thread, &attr, xt_thread_main, &data);
1636
err = pthread_create(&child_thread, NULL, xt_thread_main, &data);
1639
xt_free_thread(child);
1640
return xt_register_errno(XT_REG_CONTEXT, err);
1643
while (!data.td_started) {
1644
/* Check that the self is still alive: */
1645
if (pthread_kill(child_thread, 0))
1653
xtPublic void xt_run_thread(XTThreadPtr self, XTThreadPtr child, void *(*start_routine)(XTThreadPtr))
1655
if (!xt_run_thread_ns(child, start_routine))
1659
xtPublic void xt_exit_thread(XTThreadPtr self, void *result)
1661
xt_free_thread(self);
1662
pthread_exit(result);
1665
xtPublic void *xt_wait_for_thread_to_exit(xtThreadID tid, xtBool ignore_error)
1668
void *value_ptr = NULL;
1673
xt_lock_mutex_ns(&thr_array_lock);
1674
if (tid < xt_thr_maximum_threads) {
1675
// No resize lock required if I have the array lock.
1676
if ((thread = xt_thr_array[tid].td_thread)) {
1677
t1 = thread->t_pthread;
1681
xt_unlock_mutex_ns(&thr_array_lock);
1683
err = xt_p_join(t1, &value_ptr);
1684
if (err && !ignore_error)
1685
xt_log_errno(XT_NS_CONTEXT, err);
1691
* Kill the given thead, and wait for it to terminate.
1692
* This function just returns if the self is already dead.
1694
xtPublic void xt_kill_thread(pthread_t t1)
1697
void *value_ptr = NULL;
1699
err = pthread_kill(t1, SIGTERM);
1702
err = xt_p_join(t1, &value_ptr);
1704
xt_log_errno(XT_NS_CONTEXT, err);
1708
* -----------------------------------------------------------------------
1709
* Read/write locking
1712
#ifdef XT_THREAD_LOCK_INFO
1713
xtPublic xtBool xt_init_rwlock(XTThreadPtr self, xt_rwlock_type *rwlock, const char *name)
1715
xtPublic xtBool xt_init_rwlock(XTThreadPtr self, xt_rwlock_type *rwlock)
1720
#ifdef XT_THREAD_LOCK_INFO
1721
err = xt_p_rwlock_init_with_name(rwlock, NULL, name);
1723
err = xt_p_rwlock_init(rwlock, NULL);
1727
xt_throw_errno(XT_CONTEXT, err);
1733
xtPublic void xt_free_rwlock(xt_rwlock_type *rwlock)
1738
err = xt_p_rwlock_destroy(rwlock);
1739
if (err != XT_EBUSY)
1743
/* PMC - xt_xn_exit_db() is called even when xt_xn_init_db() is not fully completed!
1744
* This generates a lot of log entries. But I have no desire to only call
1745
* free for those articles that I have init'ed!
1747
xt_log_errno(XT_NS_CONTEXT, err);
1751
xtPublic xt_rwlock_type *xt_slock_rwlock(XTThreadPtr self, xt_rwlock_type *rwlock)
1756
err = xt_slock_rwlock_ns(rwlock);
1757
if (err != XT_EAGAIN)
1762
xt_throw_errno(XT_CONTEXT, err);
1768
xtPublic xt_rwlock_type *xt_xlock_rwlock(XTThreadPtr self, xt_rwlock_type *rwlock)
1773
err = xt_xlock_rwlock_ns(rwlock);
1774
if (err != XT_EAGAIN)
1780
xt_throw_errno(XT_CONTEXT, err);
1786
xtPublic void xt_unlock_rwlock(XTThreadPtr XT_UNUSED(self), xt_rwlock_type *rwlock)
1790
err = xt_unlock_rwlock_ns(rwlock);
1792
xt_log_errno(XT_NS_CONTEXT, err);
1796
* -----------------------------------------------------------------------
1800
xtPublic xt_mutex_type *xt_new_mutex(XTThreadPtr self)
1804
if (!(mx = (xt_mutex_type *) xt_calloc(self, sizeof(xt_mutex_type))))
1806
pushr_(xt_free, mx);
1807
if (!xt_init_mutex_with_autoname(self, mx)) {
1815
xtPublic void xt_delete_mutex(XTThreadPtr self, xt_mutex_type *mx)
1823
#ifdef XT_THREAD_LOCK_INFO
1824
xtPublic xtBool xt_init_mutex(XTThreadPtr self, xt_mutex_type *mx, const char *name)
1826
xtPublic xtBool xt_init_mutex(XTThreadPtr self, xt_mutex_type *mx)
1831
err = xt_p_mutex_init_with_name(mx, NULL, name);
1833
xt_throw_errno(XT_CONTEXT, err);
1839
void xt_free_mutex(xt_mutex_type *mx)
1844
err = xt_p_mutex_destroy(mx);
1845
if (err != XT_EBUSY)
1849
/* PMC - xt_xn_exit_db() is called even when xt_xn_init_db() is not fully completed!
1851
xt_log_errno(XT_NS_CONTEXT, err);
1855
xtPublic xtBool xt_lock_mutex(XTThreadPtr self, xt_mutex_type *mx)
1860
err = xt_lock_mutex_ns(mx);
1861
if (err != XT_EAGAIN)
1867
xt_throw_errno(XT_CONTEXT, err);
1873
xtPublic void xt_unlock_mutex(XTThreadPtr self, xt_mutex_type *mx)
1877
err = xt_unlock_mutex_ns(mx);
1879
xt_throw_errno(XT_CONTEXT, err);
1882
xtPublic xtBool xt_set_key(pthread_key_t key, const void *value, XTExceptionPtr e)
1885
my_pthread_setspecific_ptr(thr_key, (void *) value);
1889
err = pthread_setspecific(key, value);
1892
xt_exception_errno(e, XT_NS_CONTEXT, err);
1899
xtPublic void *xt_get_key(pthread_key_t key)
1902
return my_pthread_getspecific_ptr(void *, thr_key);
1904
return pthread_getspecific(key);
1908
xtPublic xt_cond_type *xt_new_cond(XTThreadPtr self)
1912
if (!(cond = (xt_cond_type *) xt_calloc(self, sizeof(xt_cond_type))))
1914
pushr_(xt_free, cond);
1915
if (!xt_init_cond(self, cond)) {
1923
xtPublic void xt_delete_cond(XTThreadPtr self, xt_cond_type *cond)
1927
xt_free(self, cond);
1931
xtPublic xtBool xt_init_cond(XTThreadPtr self, xt_cond_type *cond)
1935
err = pthread_cond_init(cond, NULL);
1937
xt_throw_errno(XT_CONTEXT, err);
1943
xtPublic void xt_free_cond(xt_cond_type *cond)
1948
err = pthread_cond_destroy(cond);
1949
if (err != XT_EBUSY)
1953
/* PMC - xt_xn_exit_db() is called even when xt_xn_init_db() is not fully completed!
1955
xt_log_errno(XT_NS_CONTEXT, err);
1959
xtPublic xtBool xt_throw_delayed_signal(XTThreadPtr self, c_char *func, c_char *file, u_int line)
1961
XTThreadPtr me = self ? self : xt_get_self();
1963
if (me->t_delayed_signal) {
1964
int sig = me->t_delayed_signal;
1966
me->t_delayed_signal = 0;
1967
xt_throw_signal(self, func, file, line, sig);
1973
xtPublic xtBool xt_wait_cond(XTThreadPtr self, xt_cond_type *cond, xt_mutex_type *mutex)
1976
XTThreadPtr me = self ? self : xt_get_self();
1978
/* PMC - In my tests, if I throw an exception from within the wait
1979
* the condition and the mutex remain locked.
1981
me->t_disable_interrupts = TRUE;
1982
err = xt_p_cond_wait(cond, mutex);
1983
me->t_disable_interrupts = FALSE;
1985
xt_throw_errno(XT_CONTEXT, err);
1988
if (me->t_delayed_signal) {
1989
xt_throw_delayed_signal(XT_CONTEXT);
1995
xtPublic xtBool xt_suspend(XTThreadPtr thread)
1999
// You can only suspend yourself.
2000
ASSERT_NS(pthread_equal(thread->t_pthread, pthread_self()));
2002
xt_lock_mutex_ns(&thread->t_lock);
2003
ok = xt_wait_cond(NULL, &thread->t_cond, &thread->t_lock);
2004
xt_unlock_mutex_ns(&thread->t_lock);
2008
xtPublic xtBool xt_unsuspend(XTThreadPtr target)
2010
return xt_broadcast_cond_ns(&target->t_cond);
2013
xtPublic void xt_lock_thread(XTThreadPtr thread)
2015
xt_lock_mutex_ns(&thread->t_lock);
2018
xtPublic void xt_unlock_thread(XTThreadPtr thread)
2020
xt_unlock_mutex_ns(&thread->t_lock);
2023
xtPublic xtBool xt_wait_thread(XTThreadPtr thread)
2025
return xt_wait_cond(NULL, &thread->t_cond, &thread->t_lock);
2028
xtPublic xtBool xt_timed_wait_thread(XTThreadPtr thread, u_long milli_sec)
2030
return xt_timed_wait_cond(NULL, &thread->t_cond, &thread->t_lock, milli_sec);
2033
xtPublic void xt_signal_thread(XTThreadPtr target)
2035
xt_broadcast_cond_ns(&target->t_cond);
2038
xtPublic void xt_terminate_thread(XTThreadPtr XT_UNUSED(self), XTThreadPtr target)
2040
target->t_quit = TRUE;
2041
target->t_delayed_signal = SIGTERM;
2044
xtPublic xtProcID xt_getpid()
2047
return GetCurrentProcessId();
2053
xtPublic xtBool xt_process_exists(xtProcID pid)
2062
h = OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, pid);
2064
if (GetExitCodeProcess(h, &code)) {
2065
if (code == STILL_ACTIVE)
2073
err = HRESULT_CODE(GetLastError());
2074
if (err != ERROR_INVALID_PARAMETER)
2079
if (kill(pid, 0) == -1) {
2087
xtPublic xtBool xt_timed_wait_cond(XTThreadPtr self, xt_cond_type *cond, xt_mutex_type *mutex, u_long milli_sec)
2090
struct timespec abstime;
2091
XTThreadPtr me = self ? self : xt_get_self();
2096
GetSystemTimeAsFileTime(&now.ft);
2098
/* System time is measured in 100ns units.
2099
* This calculation will be reversed by the Windows implementation
2100
* of pthread_cond_timedwait(), in order to extract the
2101
* milli-second timeout!
2103
abstime.tv.i64 = now.i64 + (milli_sec * 10000);
2105
abstime.max_timeout_msec = milli_sec;
2110
/* Get the current time in microseconds: */
2111
gettimeofday(&now, NULL);
2112
micro_sec = (u_llong) now.tv_sec * (u_llong) 1000000 + (u_llong) now.tv_usec;
2114
/* Add the timeout which is in milli seconds */
2115
micro_sec += (u_llong) milli_sec * (u_llong) 1000;
2117
/* Setup the end time, which is in nano-seconds. */
2118
abstime.tv_sec = (long) (micro_sec / 1000000); /* seconds */
2119
abstime.tv_nsec = (long) ((micro_sec % 1000000) * 1000); /* and nanoseconds */
2122
me->t_disable_interrupts = TRUE;
2123
err = xt_p_cond_timedwait(cond, mutex, &abstime);
2124
me->t_disable_interrupts = FALSE;
2125
if (err && err != ETIMEDOUT) {
2126
xt_throw_errno(XT_CONTEXT, err);
2129
if (me->t_delayed_signal) {
2130
xt_throw_delayed_signal(XT_CONTEXT);
2136
xtPublic xtBool xt_signal_cond(XTThreadPtr self, xt_cond_type *cond)
2140
err = pthread_cond_signal(cond);
2142
xt_throw_errno(XT_CONTEXT, err);
2148
xtPublic void xt_broadcast_cond(XTThreadPtr self, xt_cond_type *cond)
2152
err = pthread_cond_broadcast(cond);
2154
xt_throw_errno(XT_CONTEXT, err);
2157
xtPublic xtBool xt_broadcast_cond_ns(xt_cond_type *cond)
2161
err = pthread_cond_broadcast(cond);
2163
xt_register_errno(XT_REG_CONTEXT, err);
2169
static int prof_setjmp_count = 0;
2171
xtPublic int prof_setjmp(void)
2173
prof_setjmp_count++;
2177
xtPublic void xt_set_low_priority(XTThreadPtr self)
2179
int err = xt_p_set_low_priority(self->t_pthread);
2181
self = NULL; /* Will cause logging, instead of throwing exception */
2182
xt_throw_errno(XT_CONTEXT, err);
2186
xtPublic void xt_set_normal_priority(XTThreadPtr self)
2188
int err = xt_p_set_normal_priority(self->t_pthread);
2190
self = NULL; /* Will cause logging, instead of throwing exception */
2191
xt_throw_errno(XT_CONTEXT, err);
2195
xtPublic void xt_set_high_priority(XTThreadPtr self)
2197
int err = xt_p_set_high_priority(self->t_pthread);
2199
self = NULL; /* Will cause logging, instead of throwing exception */
2200
xt_throw_errno(XT_CONTEXT, err);
2204
xtPublic void xt_set_priority(XTThreadPtr self, int priority)
2206
if (priority < XT_PRIORITY_NORMAL)
2207
xt_set_low_priority(self);
2208
else if (priority > XT_PRIORITY_NORMAL)
2209
xt_set_high_priority(self);
2211
xt_set_normal_priority(self);
2214
/* ----------------------------------------------------------------------
2219
* Add a thread the wakeup list of a thread.
2221
xtPublic xtBool xt_add_to_wakeup_list(xtThreadID waiting_id, xtThreadID wait_for_id)
2225
THR_ARRAY_READ_LOCK(&xt_thr_array_resize_lock, waiting_id);
2226
wt = xt_thr_array[wait_for_id].td_waiting;
2227
THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, waiting_id);
2228
xt_spinlock_lock(&wt->wt_wait_list_lock);
2229
if (wt->wt_wait_list_count == wt->wt_wait_list_size) {
2230
if (!xt_realloc_ns((void **) &wt->wt_wait_list, (wt->wt_wait_list_size+1) * sizeof(xtThreadID)))
2232
wt->wt_wait_list_size++;
2234
for (u_int i=0; i<wt->wt_wait_list_count; i++) {
2235
if (wt->wt_wait_list[i] == waiting_id)
2238
wt->wt_wait_list[wt->wt_wait_list_count] = waiting_id;
2239
wt->wt_wait_list_count++;
2241
xt_spinlock_unlock(&wt->wt_wait_list_lock);
2246
* Wakeup a single thread.
2248
// Depending on platform 'thread->t_id' may not be used by THR_ARRAY_READ_LOCK().
2249
xtPublic void xt_wakeup_thread(xtThreadID thd_id, XTThreadPtr thread __attribute__((unused)))
2251
XTWaitThreadPtr target_wt;
2253
THR_ARRAY_READ_LOCK(&xt_thr_array_resize_lock, thread->t_id);
2254
target_wt = xt_thr_array[thd_id].td_waiting;
2255
THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, thread->t_id);
2256
xt_lock_mutex_ns(&target_wt->wt_lock);
2257
xt_broadcast_cond_ns(&target_wt->wt_cond);
2258
xt_unlock_mutex_ns(&target_wt->wt_lock);
2262
* Wakeup a list of threads (the list is local to the calling thread).
2264
xtPublic void xt_wakeup_thread_list(XTThreadPtr thread)
2266
XTWaitThreadPtr target_wt;
2268
for (u_int i=0; i<thread->st_thread_list_count; i++) {
2269
THR_ARRAY_READ_LOCK(&xt_thr_array_resize_lock, thread->t_id);
2270
target_wt = xt_thr_array[thread->st_thread_list[i]].td_waiting;
2271
THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, thread->t_id);
2272
xt_lock_mutex_ns(&target_wt->wt_lock);
2273
xt_broadcast_cond_ns(&target_wt->wt_cond);
2274
xt_unlock_mutex_ns(&target_wt->wt_lock);
2276
thread->st_thread_list_count = 0;
2280
* Wakeup all threads waiting for this thread.
2282
xtPublic void xt_wakeup_waiting_threads(XTThreadPtr thread)
2285
XTWaitThreadPtr target_wt;
2287
THR_ARRAY_READ_LOCK(&xt_thr_array_resize_lock, thread->t_id);
2288
wt = xt_thr_array[thread->t_id].td_waiting;
2289
THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, thread->t_id);
2290
if (!wt->wt_wait_list_count)
2293
xt_spinlock_lock(&wt->wt_wait_list_lock);
2294
if (thread->st_thread_list_size < wt->wt_wait_list_count) {
2295
if (!xt_realloc_ns((void **) &thread->st_thread_list, wt->wt_wait_list_count * sizeof(xtThreadID)))
2297
thread->st_thread_list_size = wt->wt_wait_list_count;
2299
memcpy(thread->st_thread_list, wt->wt_wait_list, wt->wt_wait_list_count * sizeof(xtThreadID));
2300
thread->st_thread_list_count = wt->wt_wait_list_count;
2301
wt->wt_wait_list_count = 0;
2302
xt_spinlock_unlock(&wt->wt_wait_list_lock);
2304
xt_wakeup_thread_list(thread);
2308
for (u_int i=0; i<wt->wt_wait_list_count; i++) {
2309
THR_ARRAY_READ_LOCK(&xt_thr_array_resize_lock, thread->t_id);
2310
target_wt = xt_thr_array[wt->wt_wait_list[i]].td_waiting;
2311
THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, thread->t_id);
2312
xt_lock_mutex_ns(&target_wt->wt_lock);
2313
xt_broadcast_cond_ns(&target_wt->wt_cond);
2314
xt_unlock_mutex_ns(&target_wt->wt_lock);
2316
wt->wt_wait_list_count = 0;
2317
xt_spinlock_unlock(&wt->wt_wait_list_lock);
2321
* -----------------------------------------------------------------------
2325
#ifdef DEBUG_NO_ACTIVITY
2326
static void debug_no_activity()
2328
xt_logf(XT_NT_INFO, "No activity!\n");
2331
time_t last_call_time;
2332
u_llong last_commit_total;
2333
xtWord8 last_rec_flush_time = (xtWord8) -1;
2334
xtWord8 last_rec_write_time = (xtWord8) -1;
2335
xtWord8 last_ind_flush_time = (xtWord8) -1;
2336
xtWord8 last_ind_write_time = (xtWord8) -1;
2337
xtWord8 last_ilog_flush_time = (xtWord8) -1;
2338
xtWord8 last_ilog_write_time = (xtWord8) -1;
2339
xtWord8 last_xlog_flush_time = (xtWord8) -1;
2340
xtWord8 last_xlog_write_time = (xtWord8) -1;
2341
xtWord8 last_data_flush_time = (xtWord8) -1;
2342
xtWord8 last_data_write_time = (xtWord8) -1;
2345
xtPublic void xt_gather_statistics(XTStatisticsPtr stats)
2347
XTThreadDataRec *thr_data;
2351
xt_lock_mutex_ns(&thr_array_lock);
2352
*stats = thr_statistics;
2353
// Ignore index 0, it is not used!
2354
// No read resize lock required if I have the array lock
2355
thr_data = &xt_thr_array[XT_MIN_THREAD_ID];
2356
for (u_int i=XT_MIN_THREAD_ID; i<xt_thr_current_max_threads; i++) {
2357
if ((thr = thr_data->td_thread)) {
2358
stats->st_commits += thr->st_statistics.st_commits;
2359
stats->st_rollbacks += thr->st_statistics.st_rollbacks;
2360
stats->st_stat_read += thr->st_statistics.st_stat_read;
2361
stats->st_stat_write += thr->st_statistics.st_stat_write;
2363
XT_ADD_STATS(stats->st_rec, thr->st_statistics.st_rec);
2364
if ((s = thr->st_statistics.st_rec.ts_flush_start))
2365
stats->st_rec.ts_flush_time += xt_trace_clock() - s;
2366
#ifdef XT_TIME_DISK_WRITES
2367
if ((s = thr->st_statistics.st_rec.ts_write_start))
2368
stats->st_rec.ts_write_time += xt_trace_clock() - s;
2370
#ifdef XT_TIME_DISK_READS
2371
if ((s = thr->st_statistics.st_rec.ts_read_start))
2372
stats->st_rec.ts_read_time += xt_trace_clock() - s;
2374
stats->st_rec_cache_hit += thr->st_statistics.st_rec_cache_hit;
2375
stats->st_rec_cache_miss += thr->st_statistics.st_rec_cache_miss;
2376
stats->st_rec_cache_frees += thr->st_statistics.st_rec_cache_frees;
2378
XT_ADD_STATS(stats->st_ind, thr->st_statistics.st_ind);
2379
if ((s = thr->st_statistics.st_ind.ts_flush_start))
2380
stats->st_ind.ts_flush_time += xt_trace_clock() - s;
2381
#ifdef XT_TIME_DISK_WRITES
2382
if ((s = thr->st_statistics.st_ind.ts_write_start))
2383
stats->st_ind.ts_write_time += xt_trace_clock() - s;
2385
#ifdef XT_TIME_DISK_READS
2386
if ((s = thr->st_statistics.st_ind.ts_read_start))
2387
stats->st_ind.ts_read_time += xt_trace_clock() - s;
2389
stats->st_ind_cache_hit += thr->st_statistics.st_ind_cache_hit;
2390
stats->st_ind_cache_miss += thr->st_statistics.st_ind_cache_miss;
2391
XT_ADD_STATS(stats->st_ilog, thr->st_statistics.st_ilog);
2392
if ((s = thr->st_statistics.st_ilog.ts_flush_start))
2393
stats->st_ilog.ts_flush_time += xt_trace_clock() - s;
2394
#ifdef XT_TIME_DISK_WRITES
2395
if ((s = thr->st_statistics.st_ilog.ts_write_start))
2396
stats->st_ilog.ts_write_time += xt_trace_clock() - s;
2398
#ifdef XT_TIME_DISK_READS
2399
if ((s = thr->st_statistics.st_ilog.ts_read_start))
2400
stats->st_ilog.ts_read_time += xt_trace_clock() - s;
2403
XT_ADD_STATS(stats->st_xlog, thr->st_statistics.st_xlog);
2404
if ((s = thr->st_statistics.st_xlog.ts_flush_start))
2405
stats->st_xlog.ts_flush_time += xt_trace_clock() - s;
2406
#ifdef XT_TIME_DISK_WRITES
2407
if ((s = thr->st_statistics.st_xlog.ts_write_start))
2408
stats->st_xlog.ts_write_time += xt_trace_clock() - s;
2410
#ifdef XT_TIME_DISK_READS
2411
if ((s = thr->st_statistics.st_xlog.ts_read_start))
2412
stats->st_xlog.ts_read_time += xt_trace_clock() - s;
2414
stats->st_xlog_cache_hit += thr->st_statistics.st_xlog_cache_hit;
2415
stats->st_xlog_cache_miss += thr->st_statistics.st_xlog_cache_miss;
2417
XT_ADD_STATS(stats->st_data, thr->st_statistics.st_data);
2418
if ((s = thr->st_statistics.st_data.ts_flush_start))
2419
stats->st_data.ts_flush_time += xt_trace_clock() - s;
2420
#ifdef XT_TIME_DISK_WRITES
2421
if ((s = thr->st_statistics.st_data.ts_write_start))
2422
stats->st_data.ts_write_time += xt_trace_clock() - s;
2424
#ifdef XT_TIME_DISK_READS
2425
if ((s = thr->st_statistics.st_data.ts_read_start))
2426
stats->st_data.ts_read_time += xt_trace_clock() - s;
2429
stats->st_scan_index += thr->st_statistics.st_scan_index;
2430
stats->st_scan_table += thr->st_statistics.st_scan_table;
2431
stats->st_row_select += thr->st_statistics.st_row_select;
2432
stats->st_row_insert += thr->st_statistics.st_row_insert;
2433
stats->st_row_update += thr->st_statistics.st_row_update;
2434
stats->st_row_delete += thr->st_statistics.st_row_delete;
2436
stats->st_wait_for_xact += thr->st_statistics.st_wait_for_xact;
2437
stats->st_retry_index_scan += thr->st_statistics.st_retry_index_scan;
2438
stats->st_reread_record_list += thr->st_statistics.st_reread_record_list;
2440
stats->st_ind_cache_dirty += thr->st_statistics.st_ind_cache_dirty;
2444
xt_unlock_mutex_ns(&thr_array_lock);
2446
#ifdef DEBUG_NO_ACTIVITY
2447
time_t now = time(NULL);
2449
/* Make sure at least 1 second has gone by: */
2450
if (!last_call_time || now > last_call_time) {
2451
if (last_commit_total &&
2452
last_commit_total == stats->st_commits &&
2453
last_rec_flush_time == stats->st_rec.ts_flush_time &&
2454
last_ind_flush_time == stats->st_ind.ts_flush_time &&
2455
last_ilog_flush_time == stats->st_ilog.ts_flush_time &&
2456
last_xlog_flush_time == stats->st_xlog.ts_flush_time &&
2457
last_data_flush_time == stats->st_data.ts_flush_time &&
2458
last_rec_write_time == stats->st_rec.ts_write_time &&
2459
last_ind_write_time == stats->st_ind.ts_write_time &&
2460
last_ilog_write_time == stats->st_ilog.ts_write_time &&
2461
last_xlog_write_time == stats->st_xlog.ts_write_time &&
2462
last_data_write_time == stats->st_data.ts_write_time
2464
debug_no_activity();
2466
last_call_time = now;
2467
last_commit_total = stats->st_commits;
2469
last_rec_flush_time = stats->st_rec.ts_flush_time;
2470
last_ind_flush_time = stats->st_ind.ts_flush_time;
2471
last_ilog_flush_time = stats->st_ilog.ts_flush_time;
2472
last_xlog_flush_time = stats->st_xlog.ts_flush_time;
2473
last_data_flush_time = stats->st_data.ts_flush_time;
2475
last_rec_write_time = stats->st_rec.ts_write_time;
2476
last_ind_write_time = stats->st_ind.ts_write_time;
2477
last_ilog_write_time = stats->st_ilog.ts_write_time;
2478
last_xlog_write_time = stats->st_xlog.ts_write_time;
2479
last_data_write_time = stats->st_data.ts_write_time;
2484
xtPublic int xt_get_index_cache_dirty_perc()
2486
XTThreadDataRec *thr;
2487
XTStatisticsRec stats;
2490
xt_lock_mutex_ns(&thr_array_lock);
2491
stats = thr_statistics;
2492
// Ignore index 0 and 1, it is not used!
2493
// No read resize lock required if I have the array lock
2494
thr = &xt_thr_array[XT_MIN_THREAD_ID];
2495
for (u_int i=XT_MIN_THREAD_ID; i<xt_thr_current_max_threads; i++) {
2497
stats.st_ind_cache_dirty += thr->td_thread->st_statistics.st_ind_cache_dirty;
2500
xt_unlock_mutex_ns(&thr_array_lock);
2501
v = (double) stats.st_ind_cache_dirty * (double) XT_INDEX_PAGE_SIZE;
2502
return (int) (v / (double) xt_ind_get_size() * (double) 100);
2505
static void thr_accumulate_statistics(XTThreadPtr self)
2507
thr_statistics.st_commits += self->st_statistics.st_commits;
2508
thr_statistics.st_rollbacks += self->st_statistics.st_rollbacks;
2509
thr_statistics.st_stat_read += self->st_statistics.st_stat_read;
2510
thr_statistics.st_stat_write += self->st_statistics.st_stat_write;
2512
XT_ADD_STATS(thr_statistics.st_rec, self->st_statistics.st_rec);
2513
thr_statistics.st_rec_cache_hit += self->st_statistics.st_rec_cache_hit;
2514
thr_statistics.st_rec_cache_miss += self->st_statistics.st_rec_cache_miss;
2515
thr_statistics.st_rec_cache_frees += self->st_statistics.st_rec_cache_frees;
2517
XT_ADD_STATS(thr_statistics.st_ind, self->st_statistics.st_ind);
2518
thr_statistics.st_ind_cache_hit += self->st_statistics.st_ind_cache_hit;
2519
thr_statistics.st_ind_cache_miss += self->st_statistics.st_ind_cache_miss;
2520
XT_ADD_STATS(thr_statistics.st_ilog, self->st_statistics.st_ilog);
2522
XT_ADD_STATS(thr_statistics.st_xlog, self->st_statistics.st_xlog);
2523
thr_statistics.st_xlog_cache_hit += self->st_statistics.st_xlog_cache_hit;
2524
thr_statistics.st_xlog_cache_miss += self->st_statistics.st_xlog_cache_miss;
2526
XT_ADD_STATS(thr_statistics.st_data, self->st_statistics.st_data);
2528
thr_statistics.st_scan_index += self->st_statistics.st_scan_index;
2529
thr_statistics.st_scan_table += self->st_statistics.st_scan_table;
2530
thr_statistics.st_row_select += self->st_statistics.st_row_select;
2531
thr_statistics.st_row_insert += self->st_statistics.st_row_insert;
2532
thr_statistics.st_row_update += self->st_statistics.st_row_update;
2533
thr_statistics.st_row_delete += self->st_statistics.st_row_delete;
2535
thr_statistics.st_wait_for_xact += self->st_statistics.st_wait_for_xact;
2536
thr_statistics.st_retry_index_scan += self->st_statistics.st_retry_index_scan;
2537
thr_statistics.st_reread_record_list += self->st_statistics.st_reread_record_list;
2539
thr_statistics.st_ind_cache_dirty += self->st_statistics.st_ind_cache_dirty;
2542
xtPublic u_llong xt_get_statistic(XTStatisticsPtr stats, XTDatabaseHPtr db, u_int rec_id)
2547
case XT_STAT_TIME_CURRENT:
2548
stat_value = (u_llong) time(NULL);
2550
case XT_STAT_TIME_PASSED:
2551
stat_value = (u_llong) xt_trace_clock();
2553
case XT_STAT_COMMITS:
2554
stat_value = stats->st_commits;
2556
case XT_STAT_ROLLBACKS:
2557
stat_value = stats->st_rollbacks;
2559
case XT_STAT_STAT_READS:
2560
stat_value = stats->st_stat_read;
2562
case XT_STAT_STAT_WRITES:
2563
stat_value = stats->st_stat_write;
2566
case XT_STAT_REC_BYTES_IN:
2567
stat_value = stats->st_rec.ts_read;
2569
case XT_STAT_REC_BYTES_OUT:
2570
stat_value = stats->st_rec.ts_write;
2572
case XT_STAT_REC_SYNC_COUNT:
2573
stat_value = stats->st_rec.ts_flush;
2575
case XT_STAT_REC_SYNC_TIME:
2576
stat_value = stats->st_rec.ts_flush_time;
2578
case XT_STAT_REC_CACHE_HIT:
2579
stat_value = stats->st_rec_cache_hit;
2581
case XT_STAT_REC_CACHE_MISS:
2582
stat_value = stats->st_rec_cache_miss;
2584
case XT_STAT_REC_CACHE_FREES:
2585
stat_value = stats->st_rec_cache_frees;
2587
case XT_STAT_REC_CACHE_USAGE:
2588
stat_value = (u_llong) xt_tc_get_usage();
2591
case XT_STAT_IND_BYTES_IN:
2592
stat_value = stats->st_ind.ts_read;
2594
case XT_STAT_IND_BYTES_OUT:
2595
stat_value = stats->st_ind.ts_write;
2597
case XT_STAT_IND_SYNC_COUNT:
2598
stat_value = stats->st_ind.ts_flush;
2600
case XT_STAT_IND_SYNC_TIME:
2601
stat_value = stats->st_ind.ts_flush_time;
2603
case XT_STAT_IND_CACHE_HIT:
2604
stat_value = stats->st_ind_cache_hit;
2606
case XT_STAT_IND_CACHE_MISS:
2607
stat_value = stats->st_ind_cache_miss;
2609
case XT_STAT_IND_CACHE_USAGE:
2610
stat_value = (u_llong) xt_ind_get_usage();
2612
case XT_STAT_ILOG_BYTES_IN:
2613
stat_value = stats->st_ilog.ts_read;
2615
case XT_STAT_ILOG_BYTES_OUT:
2616
stat_value = stats->st_ilog.ts_write;
2618
case XT_STAT_ILOG_SYNC_COUNT:
2619
stat_value = stats->st_ilog.ts_flush;
2621
case XT_STAT_ILOG_SYNC_TIME:
2622
stat_value = stats->st_ilog.ts_flush_time;
2625
case XT_STAT_XLOG_BYTES_IN:
2626
stat_value = stats->st_xlog.ts_read;
2628
case XT_STAT_XLOG_BYTES_OUT:
2629
stat_value = stats->st_xlog.ts_write;
2631
case XT_STAT_XLOG_SYNC_COUNT:
2632
stat_value = stats->st_xlog.ts_flush;
2634
case XT_STAT_XLOG_SYNC_TIME:
2635
stat_value = stats->st_xlog.ts_flush_time;
2637
case XT_STAT_XLOG_CACHE_HIT:
2638
stat_value = stats->st_xlog_cache_hit;
2640
case XT_STAT_XLOG_CACHE_MISS:
2641
stat_value = stats->st_xlog_cache_miss;
2643
case XT_STAT_XLOG_CACHE_USAGE:
2644
stat_value = (u_llong) xt_xlog_get_usage();
2647
case XT_STAT_DATA_BYTES_IN:
2648
stat_value = stats->st_data.ts_read;
2650
case XT_STAT_DATA_BYTES_OUT:
2651
stat_value = stats->st_data.ts_write;
2653
case XT_STAT_DATA_SYNC_COUNT:
2654
stat_value = stats->st_data.ts_flush;
2656
case XT_STAT_DATA_SYNC_TIME:
2657
stat_value = stats->st_data.ts_flush_time;
2660
case XT_STAT_BYTES_TO_CHKPNT:
2661
stat_value = db ? xt_bytes_since_last_checkpoint(db, db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset) : 0;
2663
case XT_STAT_LOG_BYTES_TO_WRITE:
2664
stat_value = db ? db->db_xlog.xl_log_bytes_written - db->db_xlog.xl_log_bytes_read : 0;//db->db_xlog.xlog_bytes_to_write();
2666
case XT_STAT_BYTES_TO_SWEEP:
2667
/* This stat is potentially very expensive: */
2668
stat_value = db ? xt_xn_bytes_to_sweep(db, xt_get_self()) : 0;
2670
case XT_STAT_WAIT_FOR_XACT:
2671
stat_value = stats->st_wait_for_xact;
2673
case XT_STAT_XACT_TO_CLEAN:
2674
#ifdef XT_SWEEPER_SORT_XACTS
2675
stat_value = db ? db->db_xn_curr_id + 1 - db->db_sw_to_add + db->db_sw_list_size : 0;
2677
stat_value = db ? db->db_xn_curr_id + 1 - db->db_xn_to_clean_id : 0;
2680
case XT_STAT_SWEEPER_WAITS:
2681
stat_value = db ? db->db_stat_sweep_waits : 0;
2684
case XT_STAT_SCAN_INDEX:
2685
stat_value = stats->st_scan_index;
2687
case XT_STAT_SCAN_TABLE:
2688
stat_value = stats->st_scan_table;
2690
case XT_STAT_ROW_SELECT:
2691
stat_value = stats->st_row_select;
2693
case XT_STAT_ROW_INSERT:
2694
stat_value = stats->st_row_insert;
2696
case XT_STAT_ROW_UPDATE:
2697
stat_value = stats->st_row_update;
2699
case XT_STAT_ROW_DELETE:
2700
stat_value = stats->st_row_delete;
2703
case XT_STAT_RETRY_INDEX_SCAN:
2704
stat_value = stats->st_retry_index_scan;
2706
case XT_STAT_REREAD_REC_LIST:
2707
stat_value = stats->st_reread_record_list;
2710
case XT_STAT_IND_CACHE_DIRTY:
2711
ASSERT_NS(stats->st_ind_cache_dirty >= 0);
2712
if (stats->st_ind_cache_dirty < 0)
2715
stat_value = stats->st_ind_cache_dirty;
2716
stat_value *= (u_llong) XT_INDEX_PAGE_SIZE;
2719
#ifdef XT_TIME_DISK_WRITES
2720
case XT_STAT_REC_WRITE_TIME:
2721
stat_value = stats->st_rec.ts_write_time;
2723
case XT_STAT_IND_WRITE_TIME:
2724
stat_value = stats->st_ind.ts_write_time;
2726
case XT_STAT_ILOG_WRITE_TIME:
2727
stat_value = stats->st_ilog.ts_write_time;
2729
case XT_STAT_XLOG_WRITE_TIME:
2730
stat_value = stats->st_xlog.ts_write_time;
2732
case XT_STAT_DATA_WRITE_TIME:
2733
stat_value = stats->st_data.ts_write_time;
2737
#ifdef XT_TIME_DISK_READS
2738
case XT_STAT_REC_READ_TIME:
2739
stat_value = stats->st_rec.ts_read_time;
2741
case XT_STAT_IND_READ_TIME:
2742
stat_value = stats->st_ind.ts_read_time;
2744
case XT_STAT_LOG_READ_TIME:
2745
stat_value = stats->st_ilog.ts_read_time + stats->st_xlog.ts_read_time + stats->st_data.ts_read_time;