1
/* Copyright (c) 2005 PrimeBase Technologies GmbH
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* 2005-01-15 Paul McCullagh
24
#include "xt_config.h"
34
#include "pthread_xt.h"
35
#include "hashtab_xt.h"
36
#include "filesys_xt.h"
37
#include "database_xt.h"
38
#include "memory_xt.h"
40
#include "datalog_xt.h"
41
#include "strutil_xt.h"
47
//#define XT_TEST_XACT_OVERFLOW
55
* -----------------------------------------------------------------------
59
xtPublic XTDatabaseHPtr pbxt_database = NULL; // The global open database
61
xtPublic xtLogOffset xt_db_log_file_threshold;
62
xtPublic size_t xt_db_log_buffer_size;
63
xtPublic size_t xt_db_transaction_buffer_size;
64
xtPublic size_t xt_db_checkpoint_frequency;
65
xtPublic off_t xt_db_data_log_threshold;
66
xtPublic size_t xt_db_data_file_grow_size;
67
xtPublic size_t xt_db_row_file_grow_size;
68
xtPublic size_t xt_db_record_write_threshold;
69
xtPublic int xt_db_garbage_threshold;
70
xtPublic int xt_db_log_file_count;
71
xtPublic int xt_db_auto_increment_mode; /* 0 = MySQL compatible, 1 = PrimeBase Compatible. */
72
xtPublic int xt_db_offline_log_function; /* 0 = recycle logs, 1 = delete logs, 2 = keep logs */
73
xtPublic int xt_db_sweeper_priority; /* 0 = low (default), 1 = normal, 2 = high */
74
/* Buggy at the moment.
75
* For example, a large alter table hangs.
77
xtPublic int xt_db_rewrite_flushing = 0; /* 0 = Normal fsync, 1 = Re-write flushing. */
78
xtPublic int xt_db_index_dirty_threshold;
79
xtPublic int xt_db_flush_log_at_trx_commit; /* 0 = no-write/no-flush, 1 = yes, 2 = write/no-flush */
81
xtPublic XTSortedListPtr xt_db_open_db_by_id = NULL;
82
xtPublic XTHashTabPtr xt_db_open_databases = NULL;
83
xtPublic time_t xt_db_approximate_time = 0; /* A "fast" alternative timer (not too accurate). */
85
static xtDatabaseID db_next_id = 1;
86
static volatile XTOpenFilePtr db_lock_file = NULL;
89
* -----------------------------------------------------------------------
90
* LOCK/UNLOCK INSTALLATION
93
xtPublic void xt_lock_installation(XTThreadPtr self, char *installation_path)
95
char file_path[PATH_MAX];
99
xtBool cd = pbxt_crash_debug;
101
xt_strcpy(PATH_MAX, file_path, installation_path);
102
xt_add_pbxt_file(PATH_MAX, file_path, "no-debug");
103
if (xt_fs_exists(file_path))
104
pbxt_crash_debug = FALSE;
105
xt_strcpy(PATH_MAX, file_path, installation_path);
106
xt_add_pbxt_file(PATH_MAX, file_path, "crash-debug");
107
if (xt_fs_exists(file_path))
108
pbxt_crash_debug = TRUE;
110
if (pbxt_crash_debug != cd) {
111
if (pbxt_crash_debug)
112
xt_logf(XT_NT_WARNING, "Crash debugging has been turned on ('crash-debug' file exists)\n");
114
xt_logf(XT_NT_WARNING, "Crash debugging has been turned off ('no-debug' file exists)\n");
116
else if (pbxt_crash_debug)
117
xt_logf(XT_NT_WARNING, "Crash debugging is enabled\n");
119
/* Moved the lock file out of the pbxt directory so that
120
* it is possible to drop the pbxt database!
122
xt_strcpy(PATH_MAX, file_path, installation_path);
123
xt_add_dir_char(PATH_MAX, file_path);
124
xt_strcat(PATH_MAX, file_path, "pbxt-lock");
125
db_lock_file = xt_open_file(self, file_path, XT_FT_STANDARD, XT_FS_CREATE | XT_FS_MAKE_PATH, 0);
128
if (!xt_lock_file(self, db_lock_file)) {
129
xt_logf(XT_NT_ERROR, "A server appears to already be running\n");
130
xt_logf(XT_NT_ERROR, "The file: %s, is locked\n", file_path);
131
xt_throw_xterr(XT_CONTEXT, XT_ERR_SERVER_RUNNING);
133
if (!xt_pread_file(db_lock_file, 0, 100, 0, buffer, &red_size, &self->st_statistics.st_rec, self))
136
buffer[red_size] = 0;
138
pid = (llong) _atoi64(buffer);
142
/* Problem with this code is, after a restart
143
* the process ID's are reused.
144
* If some system process grabs the proc id that
145
* the server had on the last run, then
146
* the database will not start.
147
if (xt_process_exists((xtProcID) pid)) {
148
xt_logf(XT_NT_ERROR, "A server appears to already be running, process ID: %lld\n", pid);
149
xt_logf(XT_NT_ERROR, "Remove the file: %s, if this is not the case\n", file_path);
150
xt_throw_xterr(XT_CONTEXT, XT_ERR_SERVER_RUNNING);
153
xt_logf(XT_NT_INFO, "The server was not shutdown correctly, recovery required\n");
154
#ifdef XT_BACKUP_BEFORE_RECOVERY
155
if (pbxt_crash_debug) {
156
/* The server was not shut down correctly. Make a backup before
162
xt_strcpy(PATH_MAX, file_path, installation_path);
163
xt_remove_dir_char(file_path);
164
sprintf(extension, "-recovery-%d", i);
165
xt_strcat(PATH_MAX, file_path, extension);
166
if (!xt_fs_exists(file_path))
169
xt_logf(XT_NT_INFO, "In order to reproduce recovery errors a backup of the installation\n");
170
xt_logf(XT_NT_INFO, "will be made to:\n");
171
xt_logf(XT_NT_INFO, "%s\n", file_path);
172
xt_logf(XT_NT_INFO, "Copy in progress...\n");
173
xt_fs_copy_dir(self, installation_path, file_path);
174
xt_logf(XT_NT_INFO, "Copy OK\n");
179
sprintf(buffer, "%lld", (llong) xt_getpid());
180
xt_set_eof_file(self, db_lock_file, 0);
181
if (!xt_pwrite_file(db_lock_file, 0, strlen(buffer), buffer, &self->st_statistics.st_rec, self))
185
xt_close_file_ns(db_lock_file);
192
xtPublic void xt_unlock_installation(XTThreadPtr self, char *installation_path)
195
char lock_file[PATH_MAX];
197
xt_unlock_file(NULL, db_lock_file);
198
xt_close_file_ns(db_lock_file);
201
xt_strcpy(PATH_MAX, lock_file, installation_path);
202
xt_add_dir_char(PATH_MAX, lock_file);
203
xt_strcat(PATH_MAX, lock_file, "pbxt-lock");
204
xt_fs_delete(self, lock_file);
208
int *xt_bad_pointer = 0;
210
void xt_crash_me(void)
212
if (pbxt_crash_debug)
213
*xt_bad_pointer = 123;
217
* -----------------------------------------------------------------------
221
static xtBool db_hash_comp(void *key, void *data)
223
XTDatabaseHPtr db = (XTDatabaseHPtr) data;
225
return strcmp((char *) key, db->db_name) == 0;
228
static xtHashValue db_hash(xtBool is_key, void *key_data)
230
XTDatabaseHPtr db = (XTDatabaseHPtr) key_data;
233
return xt_ht_hash((char *) key_data);
234
return xt_ht_hash(db->db_name);
237
static xtBool db_hash_comp_ci(void *key, void *data)
239
XTDatabaseHPtr db = (XTDatabaseHPtr) data;
241
return strcasecmp((char *) key, db->db_name) == 0;
244
static xtHashValue db_hash_ci(xtBool is_key, void *key_data)
246
XTDatabaseHPtr db = (XTDatabaseHPtr) key_data;
249
return xt_ht_casehash((char *) key_data);
250
return xt_ht_casehash(db->db_name);
253
static void db_hash_free(XTThreadPtr self, void *data)
255
xt_heap_release(self, (XTDatabaseHPtr) data);
258
static int db_cmp_db_id(struct XTThread *XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
260
xtDatabaseID db_id = *((xtDatabaseID *) a);
261
XTDatabaseHPtr *db_ptr = (XTDatabaseHPtr *) b;
263
if (db_id == (*db_ptr)->db_id)
265
if (db_id < (*db_ptr)->db_id)
270
xtPublic void xt_init_databases(XTThreadPtr self)
272
if (pbxt_ignore_case)
273
xt_db_open_databases = xt_new_hashtable(self, db_hash_comp_ci, db_hash_ci, db_hash_free, TRUE, TRUE);
275
xt_db_open_databases = xt_new_hashtable(self, db_hash_comp, db_hash, db_hash_free, TRUE, TRUE);
276
xt_db_open_db_by_id = xt_new_sortedlist(self, sizeof(XTDatabaseHPtr), 20, 10, db_cmp_db_id, NULL, NULL, FALSE, FALSE);
279
xtPublic void xt_stop_database_threads(XTThreadPtr self, xtBool sync)
282
XTDatabaseHPtr *dbptr;
283
XTDatabaseHPtr db = NULL;
285
if (xt_db_open_db_by_id)
286
len = xt_sl_get_size(xt_db_open_db_by_id);
287
for (u_int i=0; i<len; i++) {
288
if ((dbptr = (XTDatabaseHPtr *) xt_sl_item_at(xt_db_open_db_by_id, i))) {
292
/* Wait for the sweeper: */
293
xt_wait_for_sweeper(self, db, 16);
295
/* Wait for the writer: */
296
xt_wait_for_writer(self, db);
298
/* Wait for the checkpointer: */
299
xt_wait_for_checkpointer(self, db);
301
xt_stop_flusher(self, db);
302
xt_stop_checkpointer(self, db);
303
xt_stop_writer(self, db);
304
xt_stop_sweeper(self, db);
305
xt_stop_compactor(self, db);
306
xt_db_stop_pool_threads(self, db);
311
xtPublic void xt_exit_databases(XTThreadPtr self)
313
if (xt_db_open_databases) {
314
xt_free_hashtable(self, xt_db_open_databases);
315
xt_db_open_databases = NULL;
317
if (xt_db_open_db_by_id) {
318
xt_free_sortedlist(self, xt_db_open_db_by_id);
319
xt_db_open_db_by_id = NULL;
323
xtPublic void xt_create_database(XTThreadPtr self, char *path)
325
xt_fs_mkdir(self, path);
328
static void db_finalize(XTThreadPtr self, void *x)
330
XTDatabaseHPtr db = (XTDatabaseHPtr) x;
332
xt_stop_flusher(self, db);
333
xt_stop_checkpointer(self, db);
334
xt_stop_compactor(self, db);
335
xt_stop_sweeper(self, db);
336
xt_stop_writer(self, db);
337
xt_db_thread_pool_exit(self, db);
339
xt_sl_delete(self, xt_db_open_db_by_id, &db->db_id);
341
* Important is that xt_db_pool_exit() is called
342
* before xt_xn_exit_db() because xt_xn_exit_db()
343
* frees the checkpoint information which
344
* may be required to shutdown the tables, which
345
* flushes tables, and therefore does a checkpoint.
347
/* This was the previous order of shutdown:
348
xt_xn_exit_db(self, db);
349
xt_dl_exit_db(self, db);
350
xt_db_pool_exit(self, db);
351
db->db_indlogs.ilp_exit(self);
354
xt_db_pool_exit(self, db);
355
db->db_indlogs.ilp_exit(self);
356
xt_dl_exit_db(self, db);
357
xt_xn_exit_db(self, db);
358
xt_tab_exit_db(self, db);
360
xt_free(self, db->db_name);
363
if (db->db_main_path) {
364
xt_free(self, db->db_main_path);
365
db->db_main_path = NULL;
367
xt_free_mutex(&db->db_init_sweep_lock);
370
static void db_onrelease(void *XT_UNUSED(x))
372
/* Signal threads waiting for exclusive use of the database: */
373
if (xt_db_open_databases) // The database may already be closed.
374
xt_ht_signal(NULL, xt_db_open_databases);
377
xtPublic void xt_add_pbxt_file(size_t size, char *path, const char *file)
379
xt_add_dir_char(size, path);
380
xt_strcat(size, path, "pbxt");
381
xt_add_dir_char(size, path);
382
xt_strcat(size, path, file);
385
xtPublic void xt_add_location_file(size_t size, char *path)
387
xt_add_dir_char(size, path);
388
xt_strcat(size, path, "pbxt");
389
xt_add_dir_char(size, path);
390
xt_strcat(size, path, "location");
393
xtPublic void xt_add_tables_file(size_t size, char *path)
395
xt_add_dir_char(size, path);
396
xt_strcat(size, path, "pbxt");
397
xt_add_dir_char(size, path);
398
xt_strcat(size, path, "tables");
401
xtPublic void xt_add_pbxt_dir(size_t size, char *path)
403
xt_add_dir_char(size, path);
404
xt_strcat(size, path, "pbxt");
407
xtPublic void xt_add_system_dir(size_t size, char *path)
409
xt_add_dir_char(size, path);
410
xt_strcat(size, path, "pbxt");
411
xt_add_dir_char(size, path);
412
xt_strcat(size, path, "system");
415
xtPublic void xt_add_data_dir(size_t size, char *path)
417
xt_add_dir_char(size, path);
418
xt_strcat(size, path, "pbxt");
419
xt_add_dir_char(size, path);
420
xt_strcat(size, path, "data");
424
* I have a problem here. I cannot rely on the path given to xt_get_database() to be
425
* consistant. When called from ha_create_table() the path is not modified.
426
* However when called from ha_open() the path is first transformed by a call to
427
* fn_format(). I have given an example from a stack trace below.
429
* In this case the odd path comes from the option:
430
* --tmpdir=/Users/build/Development/mysql/debug-mysql/mysql-test/var//tmp
432
* #3 0x001a3818 in ha_pbxt::create(char const*, st_table*, st_ha_create_information*)
433
* (this=0x2036898, table_path=0xf0060bd0 "/users/build/development/mysql/debug-my
434
* sql/mysql-test/var//tmp/#sql5718_1_0.frm", table_arg=0xf00601c0,
435
* create_info=0x2017410) at ha_pbxt.cc:2323
436
* #4 0x00140d74 in ha_create_table(char const*, st_ha_create_information*, bool)
437
* (name=0xf0060bd0 "/users/build/development/mysql/debug-mysql/mysql-te
438
* st/var//tmp/#sql5718_1_0.frm", create_info=0x2017410,
439
* update_create_info=false) at handler.cc:1387
441
* #4 0x0013f7a4 in handler::ha_open(char const*, int, int) (this=0x203ba98,
442
* name=0xf005eb70 "/users/build/development/mysql/debug-mysql/mysql-te
443
* st/var/tmp/#sql5718_1_1", mode=2, test_if_locked=2) at handler.cc:993
444
* #5 0x000cd900 in openfrm(char const*, char const*, unsigned, unsigned,
445
* unsigned, st_table*) (name=0xf005f260 "/users/build/development/mys
446
* ql/debug-mysql/mysql-test/var//tmp/#sql5718_1_1.frm",
447
* alias=0xf005fb90 "#sql-5718_1", db_stat=7, prgflag=44,
448
* ha_open_flags=0, outparam=0x2039e18) at table.cc:771
450
* As a result, I no longer use the entire path as the key to find a database.
451
* Just the last component of the path (i.e. the database name) should be
454
xtPublic XTDatabaseHPtr xt_get_database(XTThreadPtr self, char *path, xtBool multi_path)
456
XTDatabaseHPtr db = NULL;
457
char db_path[PATH_MAX];
458
char db_name[NAME_MAX];
459
xtBool multi_path_db = FALSE;
461
/* A database may not be in use when this is called. */
462
ASSERT(!self->st_database);
463
xt_ht_lock(self, xt_db_open_databases);
464
pushr_(xt_ht_unlock, xt_db_open_databases);
466
xt_strcpy(PATH_MAX, db_path, path);
467
xt_add_location_file(PATH_MAX, db_path);
468
if (multi_path || xt_fs_exists(db_path))
469
multi_path_db = TRUE;
471
xt_strcpy(PATH_MAX, db_path, path);
472
xt_remove_dir_char(db_path);
473
xt_strcpy(NAME_MAX, db_name, xt_last_directory_of_path(db_path));
475
db = (XTDatabaseHPtr) xt_ht_get(self, xt_db_open_databases, db_name);
477
pushsr_(db, xt_heap_release, (XTDatabaseHPtr) xt_heap_new(self, sizeof(XTDatabaseRec), db_finalize));
478
xt_heap_set_release_callback(db, db_onrelease);
479
xt_init_mutex_with_autoname(self, &db->db_init_sweep_lock);
480
db->db_id = db_next_id++;
481
db->db_name = xt_dup_string(self, db_name);
482
db->db_main_path = xt_dup_string(self, db_path);
483
db->db_multi_path = multi_path_db;
484
#ifdef XT_TEST_XACT_OVERFLOW
485
/* Test transaction ID overflow: */
486
db->db_xn_curr_id = 0xFFFFFFFF - 30;
488
xt_db_pool_init(self, db);
489
xt_tab_init_db(self, db);
490
xt_dl_init_db(self, db);
492
/* Initialize the index logs: */
493
db->db_indlogs.ilp_init(self, db, XT_INDEX_WRITE_BUFFER_SIZE);
495
/* Recover in xt_xn_init_db() may use background threads!: */
496
xt_db_thread_pool_init(self, db);
498
xt_xn_init_db(self, db);
499
xt_sl_insert(self, xt_db_open_db_by_id, &db->db_id, &db);
501
xt_start_sweeper(self, db);
502
xt_start_compactor(self, db);
503
xt_start_writer(self, db);
504
xt_start_checkpointer(self, db);
505
if (xt_db_flush_log_at_trx_commit == 0 || xt_db_flush_log_at_trx_commit == 2)
506
xt_start_flusher(self, db);
509
xt_ht_put(self, xt_db_open_databases, db);
511
/* The recovery process could attach parts of the open
512
* database to the thread!
514
xt_unuse_database(self, self);
516
xt_heap_reference(self, db);
519
/* {INDEX-RECOV_ROWID}
520
* Wait for sweeper to finish processing possibly
521
* unswept transactions after recovery.
522
* This is required because during recovery for
523
* all index entries written the row_id is set.
525
* When the row ID is set, this means that the row
526
* is "clean". i.e. visible to all transactions.
528
* Obviously this is not necessary the case for all
529
* index entries recovered. For example,
530
* transactions that still need to be swept may be
533
* As a result, we have to wait the the sweeper
534
* to complete. Only then can we be sure that
535
* all index entries that are not visible have
538
* REASON WHY WE SET ROWID ON RECOVERY:
539
* The row ID is set on recovery because the
540
* change to the index may be lost after a crash.
541
* The change to the index is done by the sweeper, and
542
* there is no record of this change in the log.
543
* The sweeper will not "re-sweep" all transations
544
* that are recovered. As a result, this upadte
545
* of the index by the sweeper may be lost.
547
* {OPEN-DB-SWEEPER-WAIT}
548
* This has been moved to after the release of the open
549
* database lock because:
551
* - We are waiting for the sweeper which may run out of
553
* - If it runs out of cache it well wait
554
* for the freeer thread.
555
* - For the freeer thread to be able to work it needs
556
* to open the database.
557
* - To open the database it needs the open database
561
* This has been moved, see: {WAIT-FOR-SW-AFTER-RECOV}
562
pushr_(xt_heap_release, db);
563
xt_wait_for_sweeper(self, db, 0);
570
xtPublic XTDatabaseHPtr xt_get_database_by_id(XTThreadPtr self, xtDatabaseID db_id)
572
XTDatabaseHPtr *dbptr;
573
XTDatabaseHPtr db = NULL;
575
xt_ht_lock(self, xt_db_open_databases);
576
pushr_(xt_ht_unlock, xt_db_open_databases);
577
if ((dbptr = (XTDatabaseHPtr *) xt_sl_find(self, xt_db_open_db_by_id, &db_id))) {
579
xt_heap_reference(self, db);
581
freer_(); // xt_ht_unlock(xt_db_open_databases)
585
xtPublic void xt_drop_database(XTThreadPtr self, XTDatabaseHPtr db)
588
char db_name[NAME_MAX];
591
XTTablePathPtr *tp_ptr;
593
xt_ht_lock(self, xt_db_open_databases);
594
pushr_(xt_ht_unlock, xt_db_open_databases);
596
/* Shutdown the database daemons: */
597
xt_stop_flusher(self, db);
598
xt_stop_checkpointer(self, db);
599
xt_stop_sweeper(self, db);
600
xt_stop_compactor(self, db);
601
xt_stop_writer(self, db);
602
xt_db_thread_pool_exit(self, db);
604
/* Remove the database from the directory: */
605
xt_strcpy(NAME_MAX, db_name, db->db_name);
606
xt_ht_del(self, xt_db_open_databases, db_name);
608
/* Release the lock on the database directory: */
609
freer_(); // xt_ht_unlock(xt_db_open_databases)
611
/* Delete the transaction logs: */
612
xt_xlog_delete_logs(self, db);
614
/* Delete the data logs: */
615
xt_dl_delete_logs(self, db);
617
for (u_int i=0; i<xt_sl_get_size(db->db_table_paths); i++) {
619
tp_ptr = (XTTablePathPtr *) xt_sl_item_at(db->db_table_paths, i);
621
xt_strcpy(PATH_MAX, path, (*tp_ptr)->tp_path);
623
/* Delete all files in the database: */
624
pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
625
while (xt_dir_next(self, od)) {
626
file = xt_dir_name(self, od);
627
if (xt_ends_with(file, ".xtr") ||
628
xt_ends_with(file, ".xtd") ||
629
xt_ends_with(file, ".xti") ||
630
xt_ends_with(file, ".xt"))
632
xt_add_dir_char(PATH_MAX, path);
633
xt_strcat(PATH_MAX, path, file);
634
xt_fs_delete(self, path);
635
xt_remove_last_name_of_path(path);
638
freer_(); // xt_dir_close(od)
641
if (!db->db_multi_path) {
642
xt_strcpy(PATH_MAX, path, db->db_main_path);
643
xt_add_pbxt_dir(PATH_MAX, path);
644
if (!xt_fs_rmdir(NULL, path))
645
xt_log_and_clear_exception(self);
650
* Open/use a database.
652
xtPublic void xt_open_database(XTThreadPtr self, char *path, xtBool multi_path)
656
/* We cannot get a database, without unusing the current
657
* first. The reason is that the restart process will
658
* partially set the current database!
660
xt_unuse_database(self, self);
661
db = xt_get_database(self, path, multi_path);
662
pushr_(xt_heap_release, db);
663
xt_use_database(self, db, XT_FOR_USER);
664
freer_(); // xt_heap_release(self, db);
667
/* This function can only be called if you do not already have a database in
668
* use. This is because to get a database pointer you are not allowed
669
* to have a database in use!
671
xtPublic void xt_use_database(XTThreadPtr self, XTDatabaseHPtr db, int what_for)
673
/* Check if a transaction is in progress. If so,
674
* we cannot change the database!
676
if (self->st_xact_data || self->st_database)
677
xt_throw_xterr(XT_CONTEXT, XT_ERR_CANNOT_CHANGE_DB);
679
xt_heap_reference(self, db);
680
self->st_database = db;
681
#ifdef XT_WAIT_FOR_CLEANUP
682
self->st_last_xact = 0;
683
for (int i=0; i<XT_MAX_XACT_BEHIND; i++) {
684
self->st_prev_xact[i] = db->db_xn_curr_id;
687
xt_xn_init_thread(self, what_for);
690
xtPublic void xt_unuse_database(XTThreadPtr self, XTThreadPtr other_thr)
694
/* Wait for any asynchronous tasks to complete: */
695
xt_wait_for_async_tasks(self);
697
/* Free the results, if any: */
698
while ((tk = (XTTask *) xt_get_task_result(self)))
701
/* Abort the transacion if it belongs exclusively to this thread. */
702
xt_lock_mutex(self, &other_thr->t_lock);
703
pushr_(xt_unlock_mutex, &other_thr->t_lock);
705
xt_xn_exit_thread(other_thr);
706
if (other_thr->st_database) {
707
xt_heap_release(self, other_thr->st_database);
708
other_thr->st_database = NULL;
714
xtPublic void xt_db_init_thread_ns(XTThreadPtr XT_UNUSED(new_thread))
716
#ifdef XT_IMPLEMENT_NO_ACTION
717
memset(&new_thread->st_restrict_list, 0, sizeof(XTBasicListRec));
718
new_thread->st_restrict_list.bl_item_size = sizeof(XTRestrictItemRec);
722
xtPublic void xt_db_exit_thread(XTThreadPtr self)
724
#ifdef XT_IMPLEMENT_NO_ACTION
725
xt_bl_free(NULL, &self->st_restrict_list);
727
xt_unuse_database(self, self);
731
* -----------------------------------------------------------------------
736
static void check_free_list(XTDatabaseHPtr db)
741
ot = db->db_ot_pool.otp_mr_used;
743
ASSERT_NS(!ot->ot_otp_mr_used);
744
ot = db->db_ot_pool.otp_lr_used;
746
ASSERT_NS(!ot->ot_otp_lr_used);
749
ot = ot->ot_otp_mr_used;
751
ASSERT_NS(cnt == db->db_ot_pool.otp_total_free);
755
xtPublic void xt_db_pool_init(XTThreadPtr self, XTDatabaseHPtr db)
757
memset(&db->db_ot_pool, 0, sizeof(XTAllTablePoolsRec));
758
xt_init_mutex_with_autoname(self, &db->db_ot_pool.opt_lock);
759
xt_init_cond(self, &db->db_ot_pool.opt_cond);
762
xtPublic void xt_db_pool_exit(XTThreadPtr self, XTDatabaseHPtr db)
764
XTOpenTablePoolPtr table_pool, tmp;
765
XTOpenTablePtr ot, tmp_ot;
767
xt_free_mutex(&db->db_ot_pool.opt_lock);
768
xt_free_cond(&db->db_ot_pool.opt_cond);
770
for (u_int i=0; i<XT_OPEN_TABLE_POOL_HASH_SIZE; i++) {
771
table_pool = db->db_ot_pool.otp_hash[i];
773
tmp = table_pool->opt_next_hash;
774
ot = table_pool->opt_free_list;
776
tmp_ot = ot->ot_otp_next_free;
777
ot->ot_thread = self;
778
xt_close_table(ot, TRUE, FALSE);
781
xt_free(self, table_pool);
787
static XTOpenTablePoolPtr db_get_open_table_pool(XTDatabaseHPtr db, xtTableID tab_id)
789
XTOpenTablePoolPtr table_pool;
792
hash = tab_id % XT_OPEN_TABLE_POOL_HASH_SIZE;
793
table_pool = db->db_ot_pool.otp_hash[hash];
795
if (table_pool->opt_tab_id == tab_id)
797
table_pool = table_pool->opt_next_hash;
800
if (!(table_pool = (XTOpenTablePoolPtr) xt_malloc_ns(sizeof(XTOpenTablePoolRec))))
803
table_pool->opt_db = db;
804
table_pool->opt_tab_id = tab_id;
805
table_pool->opt_total_open = 0;
806
table_pool->opt_locked = XT_TABLE_NOT_LOCKED;
807
table_pool->opt_free_list = NULL;
808
table_pool->opt_next_hash = db->db_ot_pool.otp_hash[hash];
809
db->db_ot_pool.otp_hash[hash] = table_pool;
814
static void db_free_open_table_pool(XTThreadPtr self, XTOpenTablePoolPtr table_pool)
816
if (!table_pool->opt_locked && !table_pool->opt_total_open) {
817
XTOpenTablePoolPtr ptr, pptr = NULL;
820
hash = table_pool->opt_tab_id % XT_OPEN_TABLE_POOL_HASH_SIZE;
821
ptr = table_pool->opt_db->db_ot_pool.otp_hash[hash];
823
if (ptr == table_pool)
826
ptr = ptr->opt_next_hash;
829
if (ptr == table_pool) {
831
pptr->opt_next_hash = table_pool->opt_next_hash;
833
table_pool->opt_db->db_ot_pool.otp_hash[hash] = table_pool->opt_next_hash;
836
xt_free(self, table_pool);
840
static XTOpenTablePoolPtr db_lock_table_pool(XTThreadPtr self, XTDatabaseHPtr db, xtTableID tab_id, xtBool flush_table)
842
XTOpenTablePoolPtr table_pool;
843
XTOpenTablePtr ot, tmp_ot;
845
xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
846
pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
848
if (!(table_pool = db_get_open_table_pool(db, tab_id)))
851
/* Wait for the lock: */
852
while (table_pool->opt_locked) {
853
xt_timed_wait_cond(self, &db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
854
if (!(table_pool = db_get_open_table_pool(db, tab_id)))
858
/* Enter locking phase 1: */
859
table_pool->opt_locked = XT_TABLE_LOCK_WAITING;
862
/* Don't know if this is interesting as a phase, but anyway... */
863
table_pool->opt_locked = XT_TABLE_LOCK_FLUSHING;
864
freer_(); // xt_unlock_mutex(db_ot_pool.opt_lock)
866
pushr_(xt_db_unlock_table_pool, table_pool);
867
/* During this time, background processes can use the
870
* May also do a flush, but this is now taken care
871
* of here {FLUSH-BUG}
873
if ((ot = xt_db_open_pool_table(self, db, tab_id, NULL, TRUE))) {
874
pushr_(xt_db_return_table_to_pool, ot);
875
xt_sync_flush_table(self, ot, 0);
876
freer_(); //xt_db_return_table_to_pool_foreground(ot);
879
popr_(); // Discard xt_db_unlock_table_pool_no_lock(table_pool)
881
xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
882
pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
885
/* Free all open tables not in use: */
886
ot = table_pool->opt_free_list;
887
table_pool->opt_free_list = NULL;
889
tmp_ot = ot->ot_otp_next_free;
891
/* Remove from MRU list: */
892
if (db->db_ot_pool.otp_lr_used == ot)
893
db->db_ot_pool.otp_lr_used = ot->ot_otp_mr_used;
894
if (db->db_ot_pool.otp_mr_used == ot)
895
db->db_ot_pool.otp_mr_used = ot->ot_otp_lr_used;
896
if (ot->ot_otp_lr_used)
897
ot->ot_otp_lr_used->ot_otp_mr_used = ot->ot_otp_mr_used;
898
if (ot->ot_otp_mr_used)
899
ot->ot_otp_mr_used->ot_otp_lr_used = ot->ot_otp_lr_used;
901
if (db->db_ot_pool.otp_lr_used)
902
db->db_ot_pool.otp_free_time = db->db_ot_pool.otp_lr_used->ot_otp_free_time;
904
ASSERT_NS(db->db_ot_pool.otp_total_free > 0);
905
db->db_ot_pool.otp_total_free--;
907
/* Close the table: */
908
ASSERT(table_pool->opt_total_open > 0);
909
table_pool->opt_total_open--;
911
ot->ot_thread = self;
912
xt_close_table(ot, table_pool->opt_total_open == 0, FALSE);
914
/* Go to the next: */
918
/* Wait for other to close: */
919
while (table_pool->opt_total_open > 0) {
920
xt_timed_wait_cond_ns(&db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
923
/* 2nd phase, now the table is really locked: */
924
table_pool->opt_locked = XT_TABLE_LOCKED;
926
freer_(); // xt_unlock_mutex(db_ot_pool.opt_lock)
931
* This function locks a particular table by locking the table directory
932
* and waiting for all open tables handles to close.
934
* Things are a bit complicated because the sweeper must be turned off before
935
* the table directory is locked.
937
xtPublic XTOpenTablePoolPtr xt_db_lock_table_pool_by_name(XTThreadPtr self, XTDatabaseHPtr db, XTPathStrPtr tab_name, xtBool no_load, xtBool flush_table, xtBool missing_ok, XTTableHPtr *ret_tab)
939
XTOpenTablePoolPtr table_pool;
943
pushsr_(tab, xt_heap_release, xt_use_table(self, tab_name, no_load, missing_ok));
945
freer_(); // xt_heap_release(tab)
949
tab_id = tab->tab_id;
953
table_pool = db_lock_table_pool(self, db, tab_id, flush_table);
954
popr_(); // Discard xt_heap_release(tab)
958
freer_(); // xt_heap_release(tab)
959
return db_lock_table_pool(self, db, tab_id, flush_table);
962
xtPublic void xt_db_unlock_table_pool(XTThreadPtr self, XTOpenTablePoolPtr table_pool)
969
db = table_pool->opt_db;
970
xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
971
pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
973
table_pool->opt_locked = XT_TABLE_NOT_LOCKED;
974
xt_broadcast_cond(self, &db->db_ot_pool.opt_cond);
975
db_free_open_table_pool(NULL, table_pool);
977
freer_(); // xt_unlock_mutex(db_ot_pool.opt_lock)
980
xtPublic XTOpenTablePtr xt_db_open_table_using_tab(XTTableHPtr tab, XTThreadPtr thread)
982
XTDatabaseHPtr db = tab->tab_db;
983
XTOpenTablePoolPtr table_pool;
986
xt_lock_mutex_ns(&db->db_ot_pool.opt_lock);
988
if (!(table_pool = db_get_open_table_pool(db, tab->tab_id)))
991
while (table_pool->opt_locked) {
992
if (!xt_timed_wait_cond_ns(&db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000))
994
if (!(table_pool = db_get_open_table_pool(db, tab->tab_id)))
998
if ((ot = table_pool->opt_free_list)) {
999
/* Remove from the free list: */
1000
table_pool->opt_free_list = ot->ot_otp_next_free;
1002
/* Remove from MRU list: */
1003
if (db->db_ot_pool.otp_lr_used == ot)
1004
db->db_ot_pool.otp_lr_used = ot->ot_otp_mr_used;
1005
if (db->db_ot_pool.otp_mr_used == ot)
1006
db->db_ot_pool.otp_mr_used = ot->ot_otp_lr_used;
1007
if (ot->ot_otp_lr_used)
1008
ot->ot_otp_lr_used->ot_otp_mr_used = ot->ot_otp_mr_used;
1009
if (ot->ot_otp_mr_used)
1010
ot->ot_otp_mr_used->ot_otp_lr_used = ot->ot_otp_lr_used;
1012
if (db->db_ot_pool.otp_lr_used)
1013
db->db_ot_pool.otp_free_time = db->db_ot_pool.otp_lr_used->ot_otp_free_time;
1015
ASSERT_NS(db->db_ot_pool.otp_total_free > 0);
1016
db->db_ot_pool.otp_total_free--;
1018
ot->ot_thread = thread;
1022
if ((ot = xt_open_table(tab))) {
1023
ot->ot_thread = thread;
1024
table_pool->opt_total_open++;
1028
db_free_open_table_pool(NULL, table_pool);
1029
xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
1033
db_free_open_table_pool(NULL, table_pool);
1036
xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
1040
xtPublic xtBool xt_db_open_pool_table_ns(XTOpenTablePtr *ret_ot, XTDatabaseHPtr db, xtTableID tab_id)
1042
XTThreadPtr self = xt_get_self();
1046
*ret_ot = xt_db_open_pool_table(self, db, tab_id, NULL, FALSE);
1055
xtPublic XTOpenTablePtr xt_db_open_pool_table(XTThreadPtr self, XTDatabaseHPtr db, xtTableID tab_id, int *result, xtBool i_am_background)
1058
XTOpenTablePoolPtr table_pool;
1061
xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
1062
pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
1064
if (!(table_pool = db_get_open_table_pool(db, tab_id)))
1067
/* Background processes do not have to wait while flushing!
1069
* I think I did this so that the background process would
1070
* not hang during flushing. Exact reason currently
1071
* unknown (maybe not any more see {HANG-ON-FREEER} below).
1073
* NOTE: I have taken out check:
1074
* && !(i_am_background && table_pool->opt_flushing) below.
1075
* To which the text above refers. The reason is because it
1076
* has been replaced by the more general rule (described
1077
* below). This was used to ensure that background
1078
* threads do not hang when the table is flushed
1079
* in db_lock_table_pool(). Now (see below) I make
1080
* sure that a background thread does not hang during a
1081
* lock table, as long as the locker is still waiting!
1083
* The fact that a background thread can get a open table
1084
* handle while a user thread is locking and flushing a table
1085
* led to the situation that the checkpointer
1086
* could flush at the same time as a user process
1087
* which was flushing due to a rename.
1089
* This led to the situation described here: {FLUSH-BUG},
1090
* which is now fixed.
1094
* This error occurred during count_distinct3
1096
* The sweeper is waiting for the free'er, but the sweeper has a table
1097
* open (Table ./test/t2 test, ID 2)
1099
* if (!xt_timed_wait_cond_ns(&dcg->tcm_freeer_cond, &dcg->tcm_freeer_lock, 30000)) {
1100
* dcg->tcm_threads_waiting--;
1103
* #3 0x00e2a5b6 in xt_p_cond_timedwait at pthread_xt.cc:697
1104
* #4 0x00e51543 in xt_timed_wait_cond at thread_xt.cc:2053
1105
* #5 0x00e38fac in XTTabCache::tc_fetch at tabcache_xt.cc:682
1106
* #6 0x00e3974c in XTTabCache::xt_tc_write_cond at tabcache_xt.cc:279
1107
* #7 0x00e5ce31 in xn_sw_cleanup_variation at xaction_xt.cc:2101
1108
* #8 0x00e5d619 in xn_sw_cleanup_xact at xaction_xt.cc:2327
1109
* #9 0x00e5dddd in xn_sw_main at xaction_xt.cc:2608
1110
* #10 0x00e5e123 in xn_sw_run_thread at xaction_xt.cc:2741
1111
* #11 0x00e50640 in thr_main at thread_xt.cc:1081
1112
* #12 0x91fdb155 in _pthread_start
1113
* #13 0x91fdb012 in thread_start
1115
* The user thread is trying to drop the table but the table has 1 open table
1116
* (Table ./test/t2 test, ID 2)
1118
* while (table_pool->opt_total_open > 0) {
1119
* xt_timed_wait_cond(self, &db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
1121
* #3 0x00e2a5b6 in xt_p_cond_timedwait at pthread_xt.cc:697
1122
* #4 0x00e51543 in xt_timed_wait_cond at thread_xt.cc:2053
1123
* But fix: xt_db_wait_for_open_tables() has been removed!
1124
* #5 0x00de88a2 in xt_db_wait_for_open_tables at database_xt.cc:966
1125
* #6 0x00e3e983 in tab_lock_table at table_xt.cc:1557
1126
* #7 0x00e3eb1e in xt_drop_table at table_xt.cc:1941
1127
* #8 0x00e0e78e in ha_pbxt::delete_table at ha_pbxt.cc:4848
1128
* #9 0x00243a14 in handler::ha_delete_table at handler.cc:3311
1129
* #10 0x00243b7e in ha_delete_table at handler.cc:1954
1130
* #11 0x0025c990 in mysql_rm_table_part2 at sql_table.cc:1724
1131
* #12 0x0025cee8 in mysql_rm_table at sql_table.cc:1518
1132
* #13 0x0011256c in mysql_execute_command at sql_parse.cc:3357
1133
* #14 0x001188ed in mysql_parse at sql_parse.cc:5929
1134
* #15 0x00119663 in dispatch_command at sql_parse.cc:1216
1135
* #16 0x0011a960 in do_command at sql_parse.cc:857
1136
* #17 0x00105a3c in handle_one_connection at sql_connect.cc:1115
1137
* #18 0x91fdb155 in _pthread_start
1138
* #19 0x91fdb012 in thread_start
1140
* The free'er would like to free some memory but cannot becuase it requires
1141
* an open table handle, but the pool is locked by the user thread which
1142
* wants to drop the table.
1144
* // Free'er wants to get an open table, but the pool is locked (by the renamer)
1145
* while (table_pool->opt_locked && !(i_am_background && table_pool->opt_flushing)) {
1146
* xt_timed_wait_cond(self, &db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
1147
* if (!(table_pool = db_get_open_table_pool(db, tab_id)))
1150
* #0 0x91fb146e in __semwait_signal
1151
* #1 0x91fdc3e6 in _pthread_cond_wait
1152
* #2 0x920019f8 in pthread_cond_timedwait$UNIX2003
1153
* #3 0x00e2a5b6 in xt_p_cond_timedwait at pthread_xt.cc:697
1154
* #4 0x00e51543 in xt_timed_wait_cond at thread_xt.cc:2053
1155
* #5 0x00de8d8a in xt_db_open_pool_table at database_xt.cc:1091
1156
* #6 0x00e39ce3 in tabc_get_table at tabcache_xt.cc:983
1157
* #7 0x00e39deb in tabc_free_page at tabcache_xt.cc:1028
1158
* #8 0x00e3a5d2 in tabc_fr_main at tabcache_xt.cc:1227
1159
* #9 0x00e3a954 in tabc_fr_run_thread at tabcache_xt.cc:1290
1160
* #10 0x00e50640 in thr_main at thread_xt.cc:1081
1161
* #11 0x91fdb155 in _pthread_start
1162
* #12 0x91fdb012 in thread_start
1164
* My proposed solution:
1166
* Firstly, It can be assumed that background (system processes) will eventually
1167
* stop activity, once all work has been done.
1168
* So allowing them to proceed, even when the table is locked will
1169
* not lead to a livelock.
1171
* This means that we should allow background processes to proceed with a
1172
* locked table, as long as the locker is waiting.
1174
while (table_pool->opt_locked) {
1175
if (i_am_background && table_pool->opt_locked != XT_TABLE_LOCKED) {
1176
/* Background processes can proceed, if the locker is not in the
1181
xt_timed_wait_cond(self, &db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
1182
if (!(table_pool = db_get_open_table_pool(db, tab_id)))
1186
/* Moved from above, because db_get_open_table_pool() may return a different
1187
* pool on each call!
1189
pushr_(db_free_open_table_pool, table_pool);
1191
if ((ot = table_pool->opt_free_list)) {
1192
/* Remove from the free list: */
1193
table_pool->opt_free_list = ot->ot_otp_next_free;
1195
/* Remove from MRU list: */
1196
if (db->db_ot_pool.otp_lr_used == ot)
1197
db->db_ot_pool.otp_lr_used = ot->ot_otp_mr_used;
1198
if (db->db_ot_pool.otp_mr_used == ot)
1199
db->db_ot_pool.otp_mr_used = ot->ot_otp_lr_used;
1200
if (ot->ot_otp_lr_used)
1201
ot->ot_otp_lr_used->ot_otp_mr_used = ot->ot_otp_mr_used;
1202
if (ot->ot_otp_mr_used)
1203
ot->ot_otp_mr_used->ot_otp_lr_used = ot->ot_otp_lr_used;
1205
if (db->db_ot_pool.otp_lr_used)
1206
db->db_ot_pool.otp_free_time = db->db_ot_pool.otp_lr_used->ot_otp_free_time;
1208
ASSERT(db->db_ot_pool.otp_total_free > 0);
1209
db->db_ot_pool.otp_total_free--;
1211
freer_(); // db_free_open_table_pool(table_pool)
1212
freer_(); // xt_unlock_mutex(&db->db_ot_pool.opt_lock)
1213
ot->ot_thread = self;
1217
if (!(tab = xt_use_table_by_id(self, db, tab_id, result))) {
1218
/* The table no longer exists, ignore the change: */
1219
freer_(); // db_free_open_table_pool(table_pool)
1220
freer_(); // xt_unlock_mutex(&db->db_ot_pool.opt_lock)
1224
/* xt_use_table_by_id returns a referenced tab! */
1225
pushr_(xt_heap_release, tab);
1226
if ((ot = xt_open_table(tab))) {
1227
ot->ot_thread = self;
1228
table_pool->opt_total_open++;
1230
freer_(); // xt_release_heap(tab)
1232
freer_(); // db_free_open_table_pool(table_pool)
1233
freer_(); // xt_unlock_mutex(&db->db_ot_pool.opt_lock)
1237
xtPublic void xt_db_return_table_to_pool(XTThreadPtr XT_UNUSED(self), XTOpenTablePtr ot)
1239
xt_db_return_table_to_pool_ns(ot);
1242
xtPublic void xt_db_return_table_to_pool_ns(XTOpenTablePtr ot)
1244
XTOpenTablePoolPtr table_pool;
1245
XTDatabaseHPtr db = ot->ot_table->tab_db;
1246
xtBool flush_table = TRUE;
1248
/* No open table returned to the pool should still
1249
* have a cache handle!
1251
ASSERT_NS(!ot->ot_ind_rhandle);
1252
xt_lock_mutex_ns(&db->db_ot_pool.opt_lock);
1254
if (!(table_pool = db_get_open_table_pool(db, ot->ot_table->tab_id)))
1257
if (table_pool->opt_locked) {
1258
/* Table will be closed below, because the table is
1260
if (table_pool->opt_total_open > 1)
1261
flush_table = FALSE;
1264
/* Put it on the free list: */
1265
db->db_ot_pool.otp_total_free++;
1267
ot->ot_otp_next_free = table_pool->opt_free_list;
1268
table_pool->opt_free_list = ot;
1270
/* This is the time the table was freed: */
1271
ot->ot_otp_free_time = xt_db_approximate_time;
1273
/* Add to most recently used: */
1274
if ((ot->ot_otp_lr_used = db->db_ot_pool.otp_mr_used))
1275
db->db_ot_pool.otp_mr_used->ot_otp_mr_used = ot;
1276
ot->ot_otp_mr_used = NULL;
1277
db->db_ot_pool.otp_mr_used = ot;
1278
if (!db->db_ot_pool.otp_lr_used) {
1279
db->db_ot_pool.otp_lr_used = ot;
1280
db->db_ot_pool.otp_free_time = ot->ot_otp_free_time;
1287
xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
1288
xt_close_table(ot, flush_table, FALSE);
1290
/* assume that table_pool cannot be invalidated in between as we have table_pool->opt_total_open > 0 */
1291
xt_lock_mutex_ns(&db->db_ot_pool.opt_lock);
1292
table_pool->opt_total_open--;
1295
db_free_open_table_pool(NULL, table_pool);
1297
if (!xt_broadcast_cond_ns(&db->db_ot_pool.opt_cond))
1299
xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
1304
xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
1306
xt_close_table(ot, TRUE, FALSE);
1307
xt_log_and_clear_exception_ns();
1310
//#define TEST_FREE_OPEN_TABLES
1313
#undef XT_OPEN_TABLE_FREE_TIME
1314
#define XT_OPEN_TABLE_FREE_TIME 5
1317
xtPublic void xt_db_free_unused_open_tables(XTThreadPtr self, XTDatabaseHPtr db)
1319
XTOpenTablePoolPtr table_pool;
1322
xtBool flush_table = TRUE;
1325
/* A quick check of the oldest free table: */
1326
if (xt_db_approximate_time < db->db_ot_pool.otp_free_time + XT_OPEN_TABLE_FREE_TIME)
1329
table_count = db->db_table_by_id ? xt_sl_get_size(db->db_table_by_id) : 0;
1330
count = table_count * 3;
1333
#ifdef TEST_FREE_OPEN_TABLES
1336
if (db->db_ot_pool.otp_total_free > count) {
1337
XTOpenTablePtr ptr, pptr;
1339
count = table_count * 2;
1342
#ifdef TEST_FREE_OPEN_TABLES
1345
xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
1346
pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
1348
while (db->db_ot_pool.otp_total_free > count) {
1349
ASSERT_NS(db->db_ot_pool.otp_lr_used);
1350
if (!(ot = db->db_ot_pool.otp_lr_used))
1353
/* Check how long the open table has been free: */
1354
if (xt_db_approximate_time < ot->ot_otp_free_time + XT_OPEN_TABLE_FREE_TIME)
1357
ot->ot_thread = self;
1359
/* Remove from MRU list: */
1360
db->db_ot_pool.otp_lr_used = ot->ot_otp_mr_used;
1361
if (db->db_ot_pool.otp_mr_used == ot)
1362
db->db_ot_pool.otp_mr_used = ot->ot_otp_lr_used;
1363
if (ot->ot_otp_lr_used)
1364
ot->ot_otp_lr_used->ot_otp_mr_used = ot->ot_otp_mr_used;
1365
if (ot->ot_otp_mr_used)
1366
ot->ot_otp_mr_used->ot_otp_lr_used = ot->ot_otp_lr_used;
1368
if (db->db_ot_pool.otp_lr_used)
1369
db->db_ot_pool.otp_free_time = db->db_ot_pool.otp_lr_used->ot_otp_free_time;
1371
ASSERT(db->db_ot_pool.otp_total_free > 0);
1372
db->db_ot_pool.otp_total_free--;
1374
if (!(table_pool = db_get_open_table_pool(db, ot->ot_table->tab_id)))
1377
/* Find the open table in the table pool,
1378
* and remove it from the list:
1381
ptr = table_pool->opt_free_list;
1386
ptr = ptr->ot_otp_next_free;
1389
ASSERT_NS(ptr == ot);
1392
pptr->ot_otp_next_free = ot->ot_otp_next_free;
1394
table_pool->opt_free_list = ot->ot_otp_next_free;
1397
ASSERT_NS(table_pool->opt_total_open > 0);
1398
table_pool->opt_total_open--;
1399
if (table_pool->opt_total_open > 0)
1400
flush_table = FALSE;
1404
db_free_open_table_pool(self, table_pool);
1408
/* Close the table, but not
1409
* while holding the lock.
1411
xt_close_table(ot, flush_table, FALSE);
1413
xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
1414
pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
1422
* -----------------------------------------------------------------------
1426
static void db_notify_all(XTTask *tk)
1430
for (u_int i=0; ; i++) {
1431
if (!(target = (XTThreadPtr) tk->tk_notify_threads.pl_get_pointer(i)))
1434
/* The task is on the thread's to-do list: */
1435
xt_lock_thread(target);
1436
if (target->st_tasks_todo.pl_remove_pointer(tk)) {
1437
/* Notify the thread that there are no more tasks to do... */
1438
if (target->st_tasks_todo.pl_is_empty())
1439
xt_signal_thread(target);
1440
/* Release tasks that were on the to-do list: */
1443
xt_unlock_thread(target);
1445
tk->tk_notify_threads.pl_clear();
1448
static void db_thread_pool_main(XTThreadPtr self)
1450
XTDatabaseHPtr db = self->st_database;
1453
xtBool job_done = FALSE;
1456
xt_lock_mutex(self, &db->db_pool_lock);
1457
pushr_(xt_unlock_mutex, &db->db_pool_lock);
1460
db->db_pool_job_count--;
1469
while (!self->t_quit && !(tk = db->db_task_queue_front)) {
1470
/* Wait for 1/10 second (to ensure we quit on time): */
1471
xt_timed_wait_cond(self, &db->db_pool_cond, &db->db_pool_lock, 100);
1479
db->db_task_queue_front = tk->tk_task_list_next;
1480
if (!db->db_task_queue_front)
1481
db->db_task_queue_back = NULL;
1485
/* Perform the task: */
1487
if (!tk->tk_task(self)) {
1488
/* Transfer error to the task: */
1489
tk->tk_success = false;
1490
if ((tk->tk_exception = (XTExceptionPtr) xt_malloc_ns(sizeof(XTExceptionRec))))
1491
*tk->tk_exception = self->t_exception;
1493
tk->tk_out_of_memory = true;
1496
tk->tk_success = true;
1499
tk->tk_running = FALSE;
1501
/* Notify any there were forgotten: */
1504
/* The task is done: */
1505
if (tk->tk_waiting_threads.pl_is_empty()) {
1506
/* No waiting tasks, log the error: */
1507
if (!tk->tk_success)
1508
xt_log_and_clear_exception(self);
1511
for (u_int i=0; ; i++) {
1512
if (!(target = (XTThreadPtr) tk->tk_waiting_threads.pl_get_pointer(i)))
1515
/* The task is on the thread's to-do list: */
1516
xt_lock_thread(target);
1517
if (target->st_tasks_todo.pl_remove_pointer(tk)) {
1518
/* Add to the done list: */
1519
if (!target->st_tasks_done.pl_add_pointer(tk)) {
1521
xt_log_and_clear_exception(self);
1524
/* Notify the thread that there are no more tasks to do... */
1525
if (target->st_tasks_todo.pl_is_empty())
1526
xt_signal_thread(target);
1528
xt_unlock_thread(target);
1530
tk->tk_waiting_threads.pl_clear();
1534
/* We assume the reference is required to take the lock!
1540
typedef struct DBThreadData {
1541
XTDatabaseHPtr td_db;
1542
} DBThreadDataRec, *DBThreadDataPtr;
1544
static void *db_thread_pool_run_thread(XTThreadPtr self)
1546
DBThreadDataPtr td = (DBThreadDataPtr) self->t_data;
1547
XTDatabaseHPtr db = td->td_db;
1550
/* Note, the MySQL thread will be free when the this
1553
if (!myxt_create_thread())
1556
while (!self->t_quit && i<10) {
1558
/* Use the database: */
1559
xt_use_database(self, db, XT_FOR_POOL);
1561
/* {BACKGROUND-RELEASE-DB} */
1562
xt_heap_release(self, self->st_database);
1564
db_thread_pool_main(self);
1567
/* This error is "normal"! */
1568
if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
1569
!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
1570
self->t_exception.e_sys_err == SIGTERM))
1571
xt_log_and_clear_exception(self);
1575
/* Avoid releasing the database (done above) */
1576
self->st_database = NULL;
1577
xt_unuse_database(self, self);
1582
/* Pause, in case the error repeats: */
1583
xt_sleep_milli_second(10000);
1590
static void db_free_pool_thread(XTThreadPtr self, void *data)
1592
DBThreadDataPtr td = (DBThreadDataPtr) data;
1593
XTDatabaseHPtr db = td->td_db;
1594
XTThreadPtr thread, pthread;
1596
xt_free(self, data);
1598
xt_lock_mutex(self, &db->db_pool_lock);
1599
pushr_(xt_unlock_mutex, &db->db_pool_lock);
1601
/* Remove the thread from the pool: */
1603
thread = db->db_thread_pool;
1604
while (thread != self) {
1606
thread = thread->st_pool_next;
1608
if (thread == self) {
1609
db->db_pool_thread_count--;
1611
pthread->st_pool_next = thread->st_pool_next;
1613
db->db_thread_pool = thread->st_pool_next;
1616
freer_(); // xt_unlock_mutex(&db->db_pool_lock)
1619
static XTThreadPtr db_create_pool_thread(XTDatabaseHPtr db)
1622
char name[PATH_MAX];
1626
/* Note, this is just a test to see if we can create a MySQL thread.
1627
* On shutdown, this is sometimes not possible!
1629
if (!myxt_create_thread_possible())
1632
if (!(td = (DBThreadDataPtr) xt_malloc_ns(sizeof(DBThreadDataRec))))
1636
db->db_pool_thread_count++;
1637
sprintf(name, "I/O-%s", xt_last_directory_of_path(db->db_main_path));
1638
xt_remove_dir_char(name);
1639
xt_strcat(PATH_MAX, name, "-");
1640
xt_strcati(PATH_MAX, name, db->db_pool_thread_count);
1641
if (!(thread = xt_create_daemon_ns(name)))
1644
thread->st_pool_next = db->db_thread_pool;
1645
db->db_thread_pool = thread;
1647
xt_set_thread_data(thread, td, db_free_pool_thread);
1657
* Create a pool thread.
1659
* This ensures that there is at least one pool thread.
1661
* The advantage is that if, for some reason, no more threads
1662
* can be created, then we have one that can do all the work.
1664
static void db_create_pool_thread(XTThreadPtr self, XTDatabaseHPtr db)
1666
XTThreadPtr pool_thread = NULL;
1668
if (!(pool_thread = db_create_pool_thread(db)))
1671
xt_run_thread(self, pool_thread, db_thread_pool_run_thread);
1674
xtPublic void xt_db_thread_pool_init(XTThreadPtr self, XTDatabaseHPtr db)
1676
xt_init_mutex_with_autoname(self, &db->db_pool_lock);
1677
xt_init_cond(self, &db->db_pool_cond);
1678
db_create_pool_thread(self, db);
1681
xtPublic void xt_db_thread_pool_exit(XTThreadPtr self, XTDatabaseHPtr db)
1683
xt_db_stop_pool_threads(self, db);
1684
xt_free_mutex(&db->db_pool_lock);
1685
xt_free_cond(&db->db_pool_cond);
1688
xtPublic void xt_db_stop_pool_threads(XTThreadPtr self, XTDatabaseHPtr db)
1693
if (db->db_thread_pool) {
1694
xt_lock_mutex(self, &db->db_pool_lock);
1695
pushr_(xt_unlock_mutex, &db->db_pool_lock);
1697
while ((thread = db->db_thread_pool)) {
1700
xt_terminate_thread(self, thread);
1701
xt_broadcast_cond(self, &db->db_pool_cond);
1703
freer_(); // xt_unlock_mutex(&db->db_pool_lock)
1705
xt_wait_for_thread_to_exit(tid, FALSE);
1707
xt_lock_mutex(self, &db->db_pool_lock);
1708
pushr_(xt_unlock_mutex, &db->db_pool_lock);
1711
freer_(); // xt_unlock_mutex(&db->db_pool_lock)
1716
* notify_complete means the thread will wait for the task to complete, and process the result.
1717
* notify_early means the thread will wait for a notification from the task itself.
1719
* Note. we assume the caller has a reference to the task. The caller is also responsible for
1720
* freeing the reference!
1722
* This function will take extra references as required!
1724
xtPublic xtBool xt_run_async_task(XTTask *tk, xtBool notify_complete, xtBool notify_early, XTThreadPtr thread, XTDatabaseHPtr db)
1727
if (notify_complete || notify_early) {
1728
xt_lock_thread(thread);
1729
/* Count the reference is the to-do list of the thread. */
1731
if (!thread->st_tasks_todo.pl_add_pointer(tk)) {
1733
xt_unlock_thread(thread);
1736
xt_unlock_thread(thread);
1738
if (notify_complete) {
1739
if (!tk->tk_waiting_threads.pl_add_pointer(thread))
1743
if (!tk->tk_notify_threads.pl_add_pointer(thread))
1748
if (!tk->tk_running) {
1749
XTThreadPtr pool_thread = NULL;
1751
/* Note, the caller should already have a reference,
1752
* otherwise it would not be safe to access the task.
1754
* When running, we add one more reference. The reference
1755
* is owned by the running thread.
1757
* The reference will be released, when the task exection
1761
tk->tk_running = TRUE;
1762
xt_lock_mutex_ns(&db->db_pool_lock);
1764
/* Check if we need to start a new thread: */
1765
db->db_pool_job_count++;
1766
if (db->db_pool_job_count > db->db_pool_thread_count && db->db_pool_thread_count < XT_ASYNC_THREAD_COUNT) {
1767
if (!(pool_thread = db_create_pool_thread(db))) {
1768
if (thread->t_exception.e_xt_err == XT_ERR_MYSQL_NO_THREAD ||
1769
thread->t_exception.e_xt_err == XT_ERR_MYSQL_ERROR) {
1770
/* We can ignore this error if error:
1771
* XT_ERR_MYSQL_NO_THREAD can occur on shutdown. If we already have
1772
* pool threads, then this is no problem!
1774
if (db->db_pool_thread_count > 0)
1775
goto ignore_create_thread_error;
1777
xt_unlock_mutex_ns(&db->db_pool_lock);
1782
ignore_create_thread_error:
1783
if (db->db_task_queue_back)
1784
db->db_task_queue_back->tk_task_list_next = tk;
1786
db->db_task_queue_front = tk;
1787
tk->tk_task_list_next = NULL;
1788
db->db_task_queue_back = tk;
1789
xt_signal_cond(NULL, &db->db_pool_cond);
1791
xt_unlock_mutex_ns(&db->db_pool_lock);
1794
if (!xt_run_thread_ns(pool_thread, db_thread_pool_run_thread))
1804
if (notify_complete)
1805
tk->tk_waiting_threads.pl_remove_pointer(thread);
1807
tk->tk_notify_threads.pl_remove_pointer(thread);
1810
xt_lock_thread(thread);
1811
if (thread->st_tasks_todo.pl_remove_pointer(tk))
1813
xt_unlock_thread(thread);
1821
xtPublic void xt_wait_for_async_tasks(XTThreadPtr thread)
1823
if (!thread->st_tasks_todo.pl_is_empty()) {
1824
xt_lock_thread(thread);
1825
while (!thread->st_tasks_todo.pl_is_empty()) {
1826
xt_timed_wait_thread(thread, 100);
1830
for (int i=0; i++; ) {
1831
if (!(tk = (XTTask *) thread->st_tasks_todo.pl_get_pointer(i)))
1833
ASSERT_NS(tk->tk_running);
1837
xt_unlock_thread(thread);
1841
xtPublic xtBool xt_wait_for_async_task_results(XTThreadPtr thread)
1846
/* Wait for the task to finish: */
1847
xt_wait_for_async_tasks(thread);
1849
/* Collect the results: */
1850
while ((tk = xt_get_task_result(thread))) {
1851
if (!tk->tk_success) {
1852
XTThreadPtr self = xt_get_self();
1855
/* Transfer the first error to this thread... */
1856
if (tk->tk_exception)
1857
self->t_exception = *tk->tk_exception;
1859
xt_register_errno(XT_REG_CONTEXT, ENOMEM);
1863
/* Log all other errors: */
1864
if (tk->tk_exception)
1865
xt_log_exception(self, tk->tk_exception, XT_LOG_ERROR);
1875
/* After waiting, call this function to collect the results.
1876
* NOTE: Returns a references task!
1878
xtPublic XTTask *xt_get_task_result(XTThreadPtr thread)
1882
xt_lock_thread(thread);
1883
if ((tk = (XTTask *) thread->st_tasks_done.pl_get_pointer(thread->st_tasks_done.pl_size() - 1)))
1884
thread->st_tasks_done.pl_remove_pointer(tk);
1885
xt_unlock_thread(thread);
1890
* This function will wake up a waiting thread so that the
1891
* task continues without a waiting thread.
1893
xtPublic void xt_async_task_notify(XTTask *tk)
1895
if (!tk->tk_notify_threads.pl_is_empty()) {
1902
class XTTestTask : public XTTask {
1904
XTTestTask() : XTTask(),
1909
virtual void tk_delete() { delete this; }
1910
virtual xtBool tk_task(XTThreadPtr thread);
1915
virtual void tk_reference();
1916
virtual void tk_release();
1919
xtBool XTTestTask::tk_task(XTThreadPtr)
1921
sleep(tt_sleep_time);
1925
static void db_multi_async_test(XTThreadPtr self, int count)
1929
if (!self->st_database) {
1930
xt_logf(XT_NT_WARNING, "Open database required to run this test\n");
1934
for (int i=0; i<count; i++) {
1935
if (!(tt = new XTTestTask()))
1936
xt_throw_errno(XT_CONTEXT, ENOMEM);
1938
tt->tt_sleep_time = count;
1942
if (!xt_run_async_task(tt, TRUE, FALSE, self, self->st_database)) {
1949
/* Wait for the task to finish: */
1950
xt_wait_for_async_tasks(self);
1952
/* Collect the results: */
1953
while ((tt = (XTTestTask *) xt_get_task_result(self)))
1957
void XTTestTask::tk_reference()
1962
void XTTestTask::tk_release()
1969
xtPublic void xt_unit_test_async_task(XTThreadPtr self)
1971
db_multi_async_test(self, 1);
1972
//db_multi_async_test(self, 10, true);
1973
//db_multi_async_test(self, 5);