~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/database_xt.cc

  • Committer: Brian Aker
  • Date: 2010-02-14 01:56:51 UTC
  • mto: (1273.16.5 fix_is)
  • mto: This revision was merged to the branch mainline in revision 1300.
  • Revision ID: brian@gaz-20100214015651-ror9j0xu7dccz0ct
Two fixes for "make dist"

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2005 PrimeBase Technologies GmbH
2
 
 *
3
 
 * PrimeBase XT
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
 
 *
19
 
 * 2005-01-15   Paul McCullagh
20
 
 *
21
 
 * H&G2JCtL
22
 
 */
23
 
 
24
 
#include "xt_config.h"
25
 
 
26
 
#ifdef DRIZZLED
27
 
#include <bitset>
28
 
#endif
29
 
 
30
 
#include <string.h>
31
 
#include <stdio.h>
32
 
#include <signal.h>
33
 
 
34
 
#include "pthread_xt.h"
35
 
#include "hashtab_xt.h"
36
 
#include "filesys_xt.h"
37
 
#include "database_xt.h"
38
 
#include "memory_xt.h"
39
 
#include "heap_xt.h"
40
 
#include "datalog_xt.h"
41
 
#include "strutil_xt.h"
42
 
#include "util_xt.h"
43
 
#include "trace_xt.h"
44
 
#include "myxt_xt.h"
45
 
 
46
 
#ifdef DEBUG
47
 
//#define XT_TEST_XACT_OVERFLOW
48
 
#endif
49
 
 
50
 
#ifndef NAME_MAX
51
 
#define NAME_MAX 128
52
 
#endif
53
 
 
54
 
/*
55
 
 * -----------------------------------------------------------------------
56
 
 * GLOBALS
57
 
 */
58
 
 
59
 
xtPublic XTDatabaseHPtr         pbxt_database = NULL;           // The global open database
60
 
 
61
 
xtPublic xtLogOffset            xt_db_log_file_threshold;
62
 
xtPublic size_t                         xt_db_log_buffer_size;
63
 
xtPublic size_t                         xt_db_transaction_buffer_size;
64
 
xtPublic size_t                         xt_db_checkpoint_frequency;
65
 
xtPublic off_t                          xt_db_data_log_threshold;
66
 
xtPublic size_t                         xt_db_data_file_grow_size;
67
 
xtPublic size_t                         xt_db_row_file_grow_size;
68
 
xtPublic size_t                         xt_db_record_write_threshold;
69
 
xtPublic int                            xt_db_garbage_threshold;
70
 
xtPublic int                            xt_db_log_file_count;
71
 
xtPublic int                            xt_db_auto_increment_mode;              /* 0 = MySQL compatible, 1 = PrimeBase Compatible. */
72
 
xtPublic int                            xt_db_offline_log_function;             /* 0 = recycle logs, 1 = delete logs, 2 = keep logs */
73
 
xtPublic int                            xt_db_sweeper_priority;                 /* 0 = low (default), 1 = normal, 2 = high */
74
 
/* Buggy at the moment.
75
 
 * For example, a large alter table hangs.
76
 
 */
77
 
xtPublic int                            xt_db_rewrite_flushing = 0;             /* 0 = Normal fsync, 1 = Re-write flushing. */
78
 
xtPublic int                            xt_db_index_dirty_threshold;
79
 
xtPublic int                            xt_db_flush_log_at_trx_commit;  /* 0 = no-write/no-flush, 1 = yes, 2 = write/no-flush */
80
 
 
81
 
xtPublic XTSortedListPtr        xt_db_open_db_by_id = NULL;
82
 
xtPublic XTHashTabPtr           xt_db_open_databases = NULL;
83
 
xtPublic time_t                         xt_db_approximate_time = 0;             /* A "fast" alternative timer (not too accurate). */
84
 
 
85
 
static xtDatabaseID                             db_next_id = 1;
86
 
static volatile XTOpenFilePtr   db_lock_file = NULL;
87
 
 
88
 
/*
89
 
 * -----------------------------------------------------------------------
90
 
 * LOCK/UNLOCK INSTALLATION
91
 
 */
92
 
 
93
 
xtPublic void xt_lock_installation(XTThreadPtr self, char *installation_path)
94
 
{
95
 
        char                    file_path[PATH_MAX];
96
 
        char                    buffer[101];
97
 
        size_t                  red_size;
98
 
        llong                   pid;
99
 
        xtBool                  cd = pbxt_crash_debug;
100
 
 
101
 
        xt_strcpy(PATH_MAX, file_path, installation_path);
102
 
        xt_add_pbxt_file(PATH_MAX, file_path, "no-debug");
103
 
        if (xt_fs_exists(file_path))
104
 
                pbxt_crash_debug = FALSE;
105
 
        xt_strcpy(PATH_MAX, file_path, installation_path);
106
 
        xt_add_pbxt_file(PATH_MAX, file_path, "crash-debug");
107
 
        if (xt_fs_exists(file_path))
108
 
                pbxt_crash_debug = TRUE;
109
 
 
110
 
        if (pbxt_crash_debug != cd) {
111
 
                if (pbxt_crash_debug)
112
 
                        xt_logf(XT_NT_WARNING, "Crash debugging has been turned on ('crash-debug' file exists)\n");
113
 
                else
114
 
                        xt_logf(XT_NT_WARNING, "Crash debugging has been turned off ('no-debug' file exists)\n");
115
 
        }
116
 
        else if (pbxt_crash_debug)
117
 
                xt_logf(XT_NT_WARNING, "Crash debugging is enabled\n");
118
 
 
119
 
        /* Moved the lock file out of the pbxt directory so that
120
 
         * it is possible to drop the pbxt database!
121
 
         */
122
 
        xt_strcpy(PATH_MAX, file_path, installation_path);
123
 
        xt_add_dir_char(PATH_MAX, file_path);
124
 
        xt_strcat(PATH_MAX, file_path, "pbxt-lock");
125
 
        db_lock_file = xt_open_file(self, file_path, XT_FT_STANDARD, XT_FS_CREATE | XT_FS_MAKE_PATH, 0);
126
 
 
127
 
        try_(a) {
128
 
                if (!xt_lock_file(self, db_lock_file)) {
129
 
                        xt_logf(XT_NT_ERROR, "A server appears to already be running\n");
130
 
                        xt_logf(XT_NT_ERROR, "The file: %s, is locked\n", file_path);
131
 
                        xt_throw_xterr(XT_CONTEXT, XT_ERR_SERVER_RUNNING);
132
 
                }
133
 
                if (!xt_pread_file(db_lock_file, 0, 100, 0, buffer, &red_size, &self->st_statistics.st_rec, self))
134
 
                        xt_throw(self);
135
 
                if (red_size > 0) {
136
 
                        buffer[red_size] = 0;
137
 
#ifdef XT_WIN
138
 
                        pid = (llong) _atoi64(buffer);
139
 
#else
140
 
                        pid = atoll(buffer);
141
 
#endif
142
 
                        /* Problem with this code is, after a restart
143
 
                         * the process ID's are reused.
144
 
                         * If some system process grabs the proc id that
145
 
                         * the server had on the last run, then
146
 
                         * the database will not start.
147
 
                        if (xt_process_exists((xtProcID) pid)) {
148
 
                                xt_logf(XT_NT_ERROR, "A server appears to already be running, process ID: %lld\n", pid);
149
 
                                xt_logf(XT_NT_ERROR, "Remove the file: %s, if this is not the case\n", file_path);
150
 
                                xt_throw_xterr(XT_CONTEXT, XT_ERR_SERVER_RUNNING);
151
 
                        }
152
 
                        */
153
 
                        xt_logf(XT_NT_INFO, "The server was not shutdown correctly, recovery required\n");
154
 
#ifdef XT_BACKUP_BEFORE_RECOVERY
155
 
                        if (pbxt_crash_debug) {
156
 
                                /* The server was not shut down correctly. Make a backup before
157
 
                                 * we start recovery.
158
 
                                 */
159
 
                                char extension[100];
160
 
 
161
 
                                for (int i=1;;i++) {
162
 
                                        xt_strcpy(PATH_MAX, file_path, installation_path);
163
 
                                        xt_remove_dir_char(file_path);
164
 
                                        sprintf(extension, "-recovery-%d", i);
165
 
                                        xt_strcat(PATH_MAX, file_path, extension);
166
 
                                        if (!xt_fs_exists(file_path))
167
 
                                                break;
168
 
                                }
169
 
                                xt_logf(XT_NT_INFO, "In order to reproduce recovery errors a backup of the installation\n");
170
 
                                xt_logf(XT_NT_INFO, "will be made to:\n");
171
 
                                xt_logf(XT_NT_INFO, "%s\n", file_path);
172
 
                                xt_logf(XT_NT_INFO, "Copy in progress...\n");
173
 
                                xt_fs_copy_dir(self, installation_path, file_path);
174
 
                                xt_logf(XT_NT_INFO, "Copy OK\n");
175
 
                        }
176
 
#endif
177
 
                }
178
 
 
179
 
                sprintf(buffer, "%lld", (llong) xt_getpid());
180
 
                xt_set_eof_file(self, db_lock_file, 0);
181
 
                if (!xt_pwrite_file(db_lock_file, 0, strlen(buffer), buffer, &self->st_statistics.st_rec, self))
182
 
                        xt_throw(self);
183
 
        }
184
 
        catch_(a) {
185
 
                xt_close_file_ns(db_lock_file);
186
 
                db_lock_file = NULL;
187
 
                xt_throw(self);
188
 
        }
189
 
        cont_(a);
190
 
}
191
 
 
192
 
xtPublic void xt_unlock_installation(XTThreadPtr self, char *installation_path)
193
 
{
194
 
        if (db_lock_file) {
195
 
                char lock_file[PATH_MAX];
196
 
 
197
 
                xt_unlock_file(NULL, db_lock_file);
198
 
                xt_close_file_ns(db_lock_file);
199
 
                db_lock_file = NULL;
200
 
 
201
 
                xt_strcpy(PATH_MAX, lock_file, installation_path);
202
 
                xt_add_dir_char(PATH_MAX, lock_file);
203
 
                xt_strcat(PATH_MAX, lock_file, "pbxt-lock");
204
 
                xt_fs_delete(self, lock_file);
205
 
        }
206
 
}
207
 
 
208
 
int *xt_bad_pointer = 0;
209
 
 
210
 
void xt_crash_me(void)
211
 
{
212
 
        if (pbxt_crash_debug)
213
 
                *xt_bad_pointer = 123;
214
 
}
215
 
 
216
 
/*
217
 
 * -----------------------------------------------------------------------
218
 
 * INIT/EXIT DATABASE
219
 
 */
220
 
 
221
 
static xtBool db_hash_comp(void *key, void *data)
222
 
{
223
 
        XTDatabaseHPtr  db = (XTDatabaseHPtr) data;
224
 
 
225
 
        return strcmp((char *) key, db->db_name) == 0;
226
 
}
227
 
 
228
 
static xtHashValue db_hash(xtBool is_key, void *key_data)
229
 
{
230
 
        XTDatabaseHPtr  db = (XTDatabaseHPtr) key_data;
231
 
 
232
 
        if (is_key)
233
 
                return xt_ht_hash((char *) key_data);
234
 
        return xt_ht_hash(db->db_name);
235
 
}
236
 
 
237
 
static xtBool db_hash_comp_ci(void *key, void *data)
238
 
{
239
 
        XTDatabaseHPtr  db = (XTDatabaseHPtr) data;
240
 
 
241
 
        return strcasecmp((char *) key, db->db_name) == 0;
242
 
}
243
 
 
244
 
static xtHashValue db_hash_ci(xtBool is_key, void *key_data)
245
 
{
246
 
        XTDatabaseHPtr  db = (XTDatabaseHPtr) key_data;
247
 
 
248
 
        if (is_key)
249
 
                return xt_ht_casehash((char *) key_data);
250
 
        return xt_ht_casehash(db->db_name);
251
 
}
252
 
 
253
 
static void db_hash_free(XTThreadPtr self, void *data)
254
 
{
255
 
        xt_heap_release(self, (XTDatabaseHPtr) data);
256
 
}
257
 
 
258
 
static int db_cmp_db_id(struct XTThread *XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
259
 
{
260
 
        xtDatabaseID    db_id = *((xtDatabaseID *) a);
261
 
        XTDatabaseHPtr  *db_ptr = (XTDatabaseHPtr *) b;
262
 
 
263
 
        if (db_id == (*db_ptr)->db_id)
264
 
                return 0;
265
 
        if (db_id < (*db_ptr)->db_id)
266
 
                return -1;
267
 
        return 1;
268
 
}
269
 
 
270
 
xtPublic void xt_init_databases(XTThreadPtr self)
271
 
{
272
 
        if (pbxt_ignore_case)
273
 
                xt_db_open_databases = xt_new_hashtable(self, db_hash_comp_ci, db_hash_ci, db_hash_free, TRUE, TRUE);
274
 
        else
275
 
                xt_db_open_databases = xt_new_hashtable(self, db_hash_comp, db_hash, db_hash_free, TRUE, TRUE);
276
 
        xt_db_open_db_by_id = xt_new_sortedlist(self, sizeof(XTDatabaseHPtr), 20, 10, db_cmp_db_id, NULL, NULL, FALSE, FALSE);
277
 
}
278
 
 
279
 
xtPublic void xt_stop_database_threads(XTThreadPtr self, xtBool sync)
280
 
{
281
 
        u_int                   len = 0;
282
 
        XTDatabaseHPtr  *dbptr;
283
 
        XTDatabaseHPtr  db = NULL;
284
 
        
285
 
        if (xt_db_open_db_by_id)
286
 
                len = xt_sl_get_size(xt_db_open_db_by_id);
287
 
        for (u_int i=0; i<len; i++) {
288
 
                if ((dbptr = (XTDatabaseHPtr *) xt_sl_item_at(xt_db_open_db_by_id, i))) {
289
 
                        db = *dbptr;
290
 
 
291
 
                        if (sync) {
292
 
                                /* Wait for the sweeper: */
293
 
                                xt_wait_for_sweeper(self, db, 16);
294
 
                                
295
 
                                /* Wait for the writer: */
296
 
                                xt_wait_for_writer(self, db);
297
 
 
298
 
                                /* Wait for the checkpointer: */
299
 
                                xt_wait_for_checkpointer(self, db);
300
 
                        }
301
 
                        xt_stop_flusher(self, db);
302
 
                        xt_stop_checkpointer(self, db);
303
 
                        xt_stop_writer(self, db);
304
 
                        xt_stop_sweeper(self, db);
305
 
                        xt_stop_compactor(self, db);
306
 
                        xt_db_stop_pool_threads(self, db);
307
 
                }
308
 
        }
309
 
}
310
 
 
311
 
xtPublic void xt_exit_databases(XTThreadPtr self)
312
 
{
313
 
        if (xt_db_open_databases) {
314
 
                xt_free_hashtable(self, xt_db_open_databases);
315
 
                xt_db_open_databases = NULL;
316
 
        }
317
 
        if (xt_db_open_db_by_id) {
318
 
                xt_free_sortedlist(self, xt_db_open_db_by_id);
319
 
                xt_db_open_db_by_id = NULL;
320
 
        }
321
 
}
322
 
 
323
 
xtPublic void xt_create_database(XTThreadPtr self, char *path)
324
 
{
325
 
        xt_fs_mkdir(self, path);
326
 
}
327
 
 
328
 
static void db_finalize(XTThreadPtr self, void *x)
329
 
{
330
 
        XTDatabaseHPtr  db = (XTDatabaseHPtr) x;
331
 
 
332
 
        xt_stop_flusher(self, db);
333
 
        xt_stop_checkpointer(self, db);
334
 
        xt_stop_compactor(self, db);
335
 
        xt_stop_sweeper(self, db);
336
 
        xt_stop_writer(self, db);
337
 
        xt_db_thread_pool_exit(self, db);
338
 
 
339
 
        xt_sl_delete(self, xt_db_open_db_by_id, &db->db_id);
340
 
        /* 
341
 
         * Important is that xt_db_pool_exit() is called
342
 
         * before xt_xn_exit_db() because xt_xn_exit_db()
343
 
         * frees the checkpoint information which
344
 
         * may be required to shutdown the tables, which
345
 
         * flushes tables, and therefore does a checkpoint.
346
 
         */
347
 
        /* This was the previous order of shutdown:
348
 
        xt_xn_exit_db(self, db);
349
 
        xt_dl_exit_db(self, db);
350
 
        xt_db_pool_exit(self, db);
351
 
        db->db_indlogs.ilp_exit(self);
352
 
        */
353
 
 
354
 
        xt_db_pool_exit(self, db);
355
 
        db->db_indlogs.ilp_exit(self); 
356
 
        xt_dl_exit_db(self, db);
357
 
        xt_xn_exit_db(self, db);
358
 
        xt_tab_exit_db(self, db);
359
 
        if (db->db_name) {
360
 
                xt_free(self, db->db_name);
361
 
                db->db_name = NULL;
362
 
        }
363
 
        if (db->db_main_path) {
364
 
                xt_free(self, db->db_main_path);
365
 
                db->db_main_path = NULL;
366
 
        }
367
 
        xt_free_mutex(&db->db_init_sweep_lock);
368
 
}
369
 
 
370
 
static void db_onrelease(void *XT_UNUSED(x))
371
 
{
372
 
        /* Signal threads waiting for exclusive use of the database: */
373
 
        if (xt_db_open_databases)       // The database may already be closed.
374
 
                xt_ht_signal(NULL, xt_db_open_databases);
375
 
}
376
 
 
377
 
xtPublic void xt_add_pbxt_file(size_t size, char *path, const char *file)
378
 
{
379
 
        xt_add_dir_char(size, path);
380
 
        xt_strcat(size, path, "pbxt");
381
 
        xt_add_dir_char(size, path);
382
 
        xt_strcat(size, path, file);
383
 
}
384
 
 
385
 
xtPublic void xt_add_location_file(size_t size, char *path)
386
 
{
387
 
        xt_add_dir_char(size, path);
388
 
        xt_strcat(size, path, "pbxt");
389
 
        xt_add_dir_char(size, path);
390
 
        xt_strcat(size, path, "location");
391
 
}
392
 
 
393
 
xtPublic void xt_add_tables_file(size_t size, char *path)
394
 
{
395
 
        xt_add_dir_char(size, path);
396
 
        xt_strcat(size, path, "pbxt");
397
 
        xt_add_dir_char(size, path);
398
 
        xt_strcat(size, path, "tables");
399
 
}
400
 
 
401
 
xtPublic void xt_add_pbxt_dir(size_t size, char *path)
402
 
{
403
 
        xt_add_dir_char(size, path);
404
 
        xt_strcat(size, path, "pbxt");
405
 
}
406
 
 
407
 
xtPublic void xt_add_system_dir(size_t size, char *path)
408
 
{
409
 
        xt_add_dir_char(size, path);
410
 
        xt_strcat(size, path, "pbxt");
411
 
        xt_add_dir_char(size, path);
412
 
        xt_strcat(size, path, "system");
413
 
}
414
 
 
415
 
xtPublic void xt_add_data_dir(size_t size, char *path)
416
 
{
417
 
        xt_add_dir_char(size, path);
418
 
        xt_strcat(size, path, "pbxt");
419
 
        xt_add_dir_char(size, path);
420
 
        xt_strcat(size, path, "data");
421
 
}
422
 
 
423
 
/*
424
 
 * I have a problem here. I cannot rely on the path given to xt_get_database() to be
425
 
 * consistant. When called from ha_create_table() the path is not modified.
426
 
 * However when called from ha_open() the path is first transformed by a call to
427
 
 * fn_format(). I have given an example from a stack trace below.
428
 
 *
429
 
 * In this case the odd path comes from the option:
430
 
 * --tmpdir=/Users/build/Development/mysql/debug-mysql/mysql-test/var//tmp
431
 
 *
432
 
 * #3  0x001a3818 in ha_pbxt::create(char const*, st_table*, st_ha_create_information*) 
433
 
 *     (this=0x2036898, table_path=0xf0060bd0 "/users/build/development/mysql/debug-my
434
 
 *     sql/mysql-test/var//tmp/#sql5718_1_0.frm", table_arg=0xf00601c0,
435
 
 *     create_info=0x2017410) at ha_pbxt.cc:2323
436
 
 * #4  0x00140d74 in ha_create_table(char const*, st_ha_create_information*, bool) 
437
 
 *     (name=0xf0060bd0 "/users/build/development/mysql/debug-mysql/mysql-te
438
 
 *     st/var//tmp/#sql5718_1_0.frm", create_info=0x2017410, 
439
 
 *     update_create_info=false) at handler.cc:1387
440
 
 *
441
 
 * #4  0x0013f7a4 in handler::ha_open(char const*, int, int) (this=0x203ba98, 
442
 
 *     name=0xf005eb70 "/users/build/development/mysql/debug-mysql/mysql-te
443
 
 *     st/var/tmp/#sql5718_1_1", mode=2, test_if_locked=2) at handler.cc:993
444
 
 * #5  0x000cd900 in openfrm(char const*, char const*, unsigned, unsigned, 
445
 
 *     unsigned, st_table*) (name=0xf005f260 "/users/build/development/mys
446
 
 *     ql/debug-mysql/mysql-test/var//tmp/#sql5718_1_1.frm", 
447
 
 *     alias=0xf005fb90 "#sql-5718_1", db_stat=7, prgflag=44, 
448
 
 *     ha_open_flags=0, outparam=0x2039e18) at table.cc:771
449
 
 *
450
 
 * As a result, I no longer use the entire path as the key to find a database.
451
 
 * Just the last component of the path (i.e. the database name) should be
452
 
 * sufficient!?
453
 
 */
454
 
xtPublic XTDatabaseHPtr xt_get_database(XTThreadPtr self, char *path, xtBool multi_path)
455
 
{
456
 
        XTDatabaseHPtr  db = NULL;
457
 
        char                    db_path[PATH_MAX];
458
 
        char                    db_name[NAME_MAX];
459
 
        xtBool                  multi_path_db = FALSE;
460
 
 
461
 
        /* A database may not be in use when this is called. */
462
 
        ASSERT(!self->st_database);
463
 
        xt_ht_lock(self, xt_db_open_databases);
464
 
        pushr_(xt_ht_unlock, xt_db_open_databases);
465
 
 
466
 
        xt_strcpy(PATH_MAX, db_path, path);
467
 
        xt_add_location_file(PATH_MAX, db_path);
468
 
        if (multi_path || xt_fs_exists(db_path))
469
 
                multi_path_db = TRUE;
470
 
 
471
 
        xt_strcpy(PATH_MAX, db_path, path);
472
 
        xt_remove_dir_char(db_path);
473
 
        xt_strcpy(NAME_MAX, db_name, xt_last_directory_of_path(db_path));
474
 
 
475
 
        db = (XTDatabaseHPtr) xt_ht_get(self, xt_db_open_databases, db_name);
476
 
        if (!db) {
477
 
                pushsr_(db, xt_heap_release, (XTDatabaseHPtr) xt_heap_new(self, sizeof(XTDatabaseRec), db_finalize));
478
 
                xt_heap_set_release_callback(db, db_onrelease);
479
 
                xt_init_mutex_with_autoname(self, &db->db_init_sweep_lock);
480
 
                db->db_id = db_next_id++;
481
 
                db->db_name = xt_dup_string(self, db_name);
482
 
                db->db_main_path = xt_dup_string(self, db_path);
483
 
                db->db_multi_path = multi_path_db;
484
 
#ifdef XT_TEST_XACT_OVERFLOW
485
 
                /* Test transaction ID overflow: */
486
 
                db->db_xn_curr_id = 0xFFFFFFFF - 30;
487
 
#endif
488
 
                xt_db_pool_init(self, db);
489
 
                xt_tab_init_db(self, db);
490
 
                xt_dl_init_db(self, db);
491
 
 
492
 
                /* Initialize the index logs: */
493
 
                db->db_indlogs.ilp_init(self, db, XT_INDEX_WRITE_BUFFER_SIZE); 
494
 
 
495
 
                /* Recover in xt_xn_init_db() may use background threads!: */
496
 
                xt_db_thread_pool_init(self, db);
497
 
 
498
 
                xt_xn_init_db(self, db);
499
 
                xt_sl_insert(self, xt_db_open_db_by_id, &db->db_id, &db);
500
 
 
501
 
                xt_start_sweeper(self, db);
502
 
                xt_start_compactor(self, db);
503
 
                xt_start_writer(self, db);
504
 
                xt_start_checkpointer(self, db);
505
 
                if (xt_db_flush_log_at_trx_commit == 0 || xt_db_flush_log_at_trx_commit == 2)
506
 
                        xt_start_flusher(self, db);
507
 
 
508
 
                popr_();
509
 
                xt_ht_put(self, xt_db_open_databases, db);
510
 
 
511
 
                /* The recovery process could attach parts of the open
512
 
                 * database to the thread!
513
 
                 */
514
 
                xt_unuse_database(self, self);
515
 
        }
516
 
        xt_heap_reference(self, db);
517
 
        freer_();
518
 
 
519
 
        /* {INDEX-RECOV_ROWID}
520
 
         * Wait for sweeper to finish processing possibly
521
 
         * unswept transactions after recovery.
522
 
         * This is required because during recovery for
523
 
         * all index entries written the row_id is set.
524
 
         *
525
 
         * When the row ID is set, this means that the row
526
 
         * is "clean". i.e. visible to all transactions.
527
 
         *
528
 
         * Obviously this is not necessary the case for all
529
 
         * index entries recovered. For example, 
530
 
         * transactions that still need to be swept may be
531
 
         * rolled back.
532
 
         *
533
 
         * As a result, we have to wait the the sweeper
534
 
         * to complete. Only then can we be sure that
535
 
         * all index entries that are not visible have
536
 
         * been removed.
537
 
         *
538
 
         * REASON WHY WE SET ROWID ON RECOVERY:
539
 
         * The row ID is set on recovery because the
540
 
         * change to the index may be lost after a crash.
541
 
         * The change to the index is done by the sweeper, and
542
 
         * there is no record of this change in the log.
543
 
         * The sweeper will not "re-sweep" all transations
544
 
         * that are recovered. As a result, this upadte
545
 
         * of the index by the sweeper may be lost.
546
 
         *
547
 
         * {OPEN-DB-SWEEPER-WAIT}
548
 
         * This has been moved to after the release of the open
549
 
         * database lock because:
550
 
         *
551
 
         * - We are waiting for the sweeper which may run out of
552
 
         * record cache.
553
 
         * - If it runs out of cache it well wait
554
 
         * for the freeer thread.
555
 
         * - For the freeer thread to be able to work it needs
556
 
         * to open the database.
557
 
         * - To open the database it needs the open database
558
 
         * lock.
559
 
         */
560
 
        /*
561
 
         * This has been moved, see: {WAIT-FOR-SW-AFTER-RECOV}
562
 
        pushr_(xt_heap_release, db);
563
 
        xt_wait_for_sweeper(self, db, 0);
564
 
        popr_();
565
 
        */
566
 
 
567
 
        return db;
568
 
}
569
 
 
570
 
xtPublic XTDatabaseHPtr xt_get_database_by_id(XTThreadPtr self, xtDatabaseID db_id)
571
 
{
572
 
        XTDatabaseHPtr  *dbptr;
573
 
        XTDatabaseHPtr  db = NULL;
574
 
 
575
 
        xt_ht_lock(self, xt_db_open_databases);
576
 
        pushr_(xt_ht_unlock, xt_db_open_databases);
577
 
        if ((dbptr = (XTDatabaseHPtr *) xt_sl_find(self, xt_db_open_db_by_id, &db_id))) {
578
 
                db = *dbptr;
579
 
                xt_heap_reference(self, db);
580
 
        }
581
 
        freer_(); // xt_ht_unlock(xt_db_open_databases)
582
 
        return db;
583
 
}
584
 
 
585
 
xtPublic void xt_drop_database(XTThreadPtr self, XTDatabaseHPtr db)
586
 
{
587
 
        char                    path[PATH_MAX];
588
 
        char                    db_name[NAME_MAX];
589
 
        XTOpenDirPtr    od;
590
 
        char                    *file;
591
 
        XTTablePathPtr  *tp_ptr;
592
 
 
593
 
        xt_ht_lock(self, xt_db_open_databases);
594
 
        pushr_(xt_ht_unlock, xt_db_open_databases);
595
 
 
596
 
        /* Shutdown the database daemons: */
597
 
        xt_stop_flusher(self, db);
598
 
        xt_stop_checkpointer(self, db);
599
 
        xt_stop_sweeper(self, db);
600
 
        xt_stop_compactor(self, db);
601
 
        xt_stop_writer(self, db);
602
 
        xt_db_thread_pool_exit(self, db);
603
 
 
604
 
        /* Remove the database from the directory: */
605
 
        xt_strcpy(NAME_MAX, db_name, db->db_name);
606
 
        xt_ht_del(self, xt_db_open_databases, db_name);
607
 
 
608
 
        /* Release the lock on the database directory: */
609
 
        freer_(); // xt_ht_unlock(xt_db_open_databases)
610
 
 
611
 
        /* Delete the transaction logs: */
612
 
        xt_xlog_delete_logs(self, db);
613
 
 
614
 
        /* Delete the data logs: */
615
 
        xt_dl_delete_logs(self, db);
616
 
 
617
 
        for (u_int i=0; i<xt_sl_get_size(db->db_table_paths); i++) {
618
 
 
619
 
                tp_ptr = (XTTablePathPtr *) xt_sl_item_at(db->db_table_paths, i);
620
 
 
621
 
                xt_strcpy(PATH_MAX, path, (*tp_ptr)->tp_path);
622
 
 
623
 
                /* Delete all files in the database: */
624
 
                pushsr_(od, xt_dir_close, xt_dir_open(self, path, NULL));
625
 
                while (xt_dir_next(self, od)) {
626
 
                        file = xt_dir_name(self, od);
627
 
                        if (xt_ends_with(file, ".xtr") ||
628
 
                                xt_ends_with(file, ".xtd") ||
629
 
                                xt_ends_with(file, ".xti") ||
630
 
                                xt_ends_with(file, ".xt"))
631
 
                        {
632
 
                                xt_add_dir_char(PATH_MAX, path);
633
 
                                xt_strcat(PATH_MAX, path, file);
634
 
                                xt_fs_delete(self, path);
635
 
                                xt_remove_last_name_of_path(path);
636
 
                        }
637
 
                }
638
 
                freer_(); // xt_dir_close(od)
639
 
                
640
 
        }
641
 
        if (!db->db_multi_path) {
642
 
                xt_strcpy(PATH_MAX, path, db->db_main_path);
643
 
                xt_add_pbxt_dir(PATH_MAX, path);
644
 
                if (!xt_fs_rmdir(NULL, path))
645
 
                        xt_log_and_clear_exception(self);
646
 
        }
647
 
}
648
 
 
649
 
/*
650
 
 * Open/use a database.
651
 
 */
652
 
xtPublic void xt_open_database(XTThreadPtr self, char *path, xtBool multi_path)
653
 
{
654
 
        XTDatabaseHPtr db;
655
 
 
656
 
        /* We cannot get a database, without unusing the current
657
 
         * first. The reason is that the restart process will
658
 
         * partially set the current database!
659
 
         */
660
 
        xt_unuse_database(self, self);
661
 
        db = xt_get_database(self, path, multi_path);
662
 
        pushr_(xt_heap_release, db);
663
 
        xt_use_database(self, db, XT_FOR_USER);
664
 
        freer_();       // xt_heap_release(self, db);   
665
 
}
666
 
 
667
 
/* This function can only be called if you do not already have a database in
668
 
 * use. This is because to get a database pointer you are not allowed
669
 
 * to have a database in use!
670
 
 */
671
 
xtPublic void xt_use_database(XTThreadPtr self, XTDatabaseHPtr db, int what_for)
672
 
{
673
 
        /* Check if a transaction is in progress. If so,
674
 
         * we cannot change the database!
675
 
         */
676
 
        if (self->st_xact_data || self->st_database)
677
 
                xt_throw_xterr(XT_CONTEXT, XT_ERR_CANNOT_CHANGE_DB);
678
 
 
679
 
        xt_heap_reference(self, db);
680
 
        self->st_database = db;
681
 
#ifdef XT_WAIT_FOR_CLEANUP
682
 
        self->st_last_xact = 0;
683
 
        for (int i=0; i<XT_MAX_XACT_BEHIND; i++) {
684
 
                self->st_prev_xact[i] = db->db_xn_curr_id;
685
 
        }
686
 
#endif
687
 
        xt_xn_init_thread(self, what_for);
688
 
}
689
 
 
690
 
xtPublic void xt_unuse_database(XTThreadPtr self, XTThreadPtr other_thr)
691
 
{
692
 
        XTTask *tk;
693
 
 
694
 
        /* Wait for any asynchronous tasks to complete: */
695
 
        xt_wait_for_async_tasks(self);
696
 
 
697
 
        /* Free the results, if any: */
698
 
        while ((tk = (XTTask *) xt_get_task_result(self)))
699
 
                tk->tk_release();
700
 
 
701
 
        /* Abort the transacion if it belongs exclusively to this thread. */
702
 
        xt_lock_mutex(self, &other_thr->t_lock);
703
 
        pushr_(xt_unlock_mutex, &other_thr->t_lock);
704
 
 
705
 
        xt_xn_exit_thread(other_thr);
706
 
        if (other_thr->st_database) {
707
 
                xt_heap_release(self, other_thr->st_database);
708
 
                other_thr->st_database = NULL;
709
 
        }
710
 
        
711
 
        freer_();
712
 
}
713
 
 
714
 
xtPublic void xt_db_init_thread_ns(XTThreadPtr XT_UNUSED(new_thread))
715
 
{
716
 
#ifdef XT_IMPLEMENT_NO_ACTION
717
 
        memset(&new_thread->st_restrict_list, 0, sizeof(XTBasicListRec));
718
 
        new_thread->st_restrict_list.bl_item_size = sizeof(XTRestrictItemRec);
719
 
#endif
720
 
}
721
 
 
722
 
xtPublic void xt_db_exit_thread(XTThreadPtr self)
723
 
{
724
 
#ifdef XT_IMPLEMENT_NO_ACTION
725
 
        xt_bl_free(NULL, &self->st_restrict_list);
726
 
#endif
727
 
        xt_unuse_database(self, self);
728
 
}
729
 
 
730
 
/*
731
 
 * -----------------------------------------------------------------------
732
 
 * OPEN TABLE POOL
733
 
 */
734
 
 
735
 
#ifdef UNUSED_CODE
736
 
static void check_free_list(XTDatabaseHPtr db)
737
 
{
738
 
        XTOpenTablePtr  ot;
739
 
        u_int                   cnt = 0;
740
 
 
741
 
        ot = db->db_ot_pool.otp_mr_used;
742
 
        if (ot)
743
 
                ASSERT_NS(!ot->ot_otp_mr_used);
744
 
        ot = db->db_ot_pool.otp_lr_used;
745
 
        if (ot)
746
 
                ASSERT_NS(!ot->ot_otp_lr_used);
747
 
        while (ot) {
748
 
                cnt++;
749
 
                ot = ot->ot_otp_mr_used;
750
 
        }
751
 
        ASSERT_NS(cnt == db->db_ot_pool.otp_total_free);
752
 
}
753
 
#endif
754
 
 
755
 
xtPublic void xt_db_pool_init(XTThreadPtr self, XTDatabaseHPtr db)
756
 
{
757
 
        memset(&db->db_ot_pool, 0, sizeof(XTAllTablePoolsRec));
758
 
        xt_init_mutex_with_autoname(self, &db->db_ot_pool.opt_lock);
759
 
        xt_init_cond(self, &db->db_ot_pool.opt_cond);
760
 
}
761
 
 
762
 
xtPublic void xt_db_pool_exit(XTThreadPtr self, XTDatabaseHPtr db)
763
 
{
764
 
        XTOpenTablePoolPtr      table_pool, tmp;
765
 
        XTOpenTablePtr          ot, tmp_ot;
766
 
 
767
 
        xt_free_mutex(&db->db_ot_pool.opt_lock);
768
 
        xt_free_cond(&db->db_ot_pool.opt_cond);
769
 
        
770
 
        for (u_int i=0; i<XT_OPEN_TABLE_POOL_HASH_SIZE; i++) {
771
 
                table_pool = db->db_ot_pool.otp_hash[i];
772
 
                while (table_pool) {
773
 
                        tmp = table_pool->opt_next_hash;
774
 
                        ot = table_pool->opt_free_list;
775
 
                        while (ot) {
776
 
                                tmp_ot = ot->ot_otp_next_free;
777
 
                                ot->ot_thread = self;
778
 
                                xt_close_table(ot, TRUE, FALSE);
779
 
                                ot = tmp_ot;
780
 
                        }
781
 
                        xt_free(self, table_pool);
782
 
                        table_pool = tmp;
783
 
                }
784
 
        }
785
 
}
786
 
 
787
 
static XTOpenTablePoolPtr db_get_open_table_pool(XTDatabaseHPtr db, xtTableID tab_id)
788
 
{
789
 
        XTOpenTablePoolPtr      table_pool;
790
 
        u_int                           hash;
791
 
 
792
 
        hash = tab_id % XT_OPEN_TABLE_POOL_HASH_SIZE;
793
 
        table_pool = db->db_ot_pool.otp_hash[hash];
794
 
        while (table_pool) {
795
 
                if (table_pool->opt_tab_id == tab_id)
796
 
                        return table_pool;
797
 
                table_pool = table_pool->opt_next_hash;
798
 
        }
799
 
        
800
 
        if (!(table_pool = (XTOpenTablePoolPtr) xt_malloc_ns(sizeof(XTOpenTablePoolRec))))
801
 
                return NULL;
802
 
 
803
 
        table_pool->opt_db = db;
804
 
        table_pool->opt_tab_id = tab_id;
805
 
        table_pool->opt_total_open = 0;
806
 
        table_pool->opt_locked = XT_TABLE_NOT_LOCKED;
807
 
        table_pool->opt_free_list = NULL;
808
 
        table_pool->opt_next_hash = db->db_ot_pool.otp_hash[hash];
809
 
        db->db_ot_pool.otp_hash[hash] = table_pool;
810
 
        
811
 
        return table_pool;
812
 
}
813
 
 
814
 
static void db_free_open_table_pool(XTThreadPtr self, XTOpenTablePoolPtr table_pool)
815
 
{
816
 
        if (!table_pool->opt_locked && !table_pool->opt_total_open) {
817
 
                XTOpenTablePoolPtr      ptr, pptr = NULL;
818
 
                u_int                           hash;
819
 
 
820
 
                hash = table_pool->opt_tab_id % XT_OPEN_TABLE_POOL_HASH_SIZE;
821
 
                ptr = table_pool->opt_db->db_ot_pool.otp_hash[hash];
822
 
                while (ptr) {
823
 
                        if (ptr == table_pool)
824
 
                                break;
825
 
                        pptr = ptr;
826
 
                        ptr = ptr->opt_next_hash;
827
 
                }
828
 
                
829
 
                if (ptr == table_pool) {
830
 
                        if (pptr)
831
 
                                pptr->opt_next_hash = table_pool->opt_next_hash;
832
 
                        else
833
 
                                table_pool->opt_db->db_ot_pool.otp_hash[hash] = table_pool->opt_next_hash;
834
 
                }
835
 
 
836
 
                xt_free(self, table_pool);
837
 
        }
838
 
}
839
 
 
840
 
static XTOpenTablePoolPtr db_lock_table_pool(XTThreadPtr self, XTDatabaseHPtr db, xtTableID tab_id, xtBool flush_table)
841
 
{
842
 
        XTOpenTablePoolPtr      table_pool;
843
 
        XTOpenTablePtr          ot, tmp_ot;
844
 
 
845
 
        xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
846
 
        pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
847
 
 
848
 
        if (!(table_pool = db_get_open_table_pool(db, tab_id)))
849
 
                xt_throw(self);
850
 
 
851
 
        /* Wait for the lock: */
852
 
        while (table_pool->opt_locked) {
853
 
                xt_timed_wait_cond(self, &db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
854
 
                if (!(table_pool = db_get_open_table_pool(db, tab_id)))
855
 
                        xt_throw(self);
856
 
        }
857
 
 
858
 
        /* Enter locking phase 1: */
859
 
        table_pool->opt_locked = XT_TABLE_LOCK_WAITING;
860
 
 
861
 
        if (flush_table) {
862
 
                /* Don't know if this is interesting as a phase, but anyway... */
863
 
                table_pool->opt_locked = XT_TABLE_LOCK_FLUSHING;
864
 
                freer_(); // xt_unlock_mutex(db_ot_pool.opt_lock)
865
 
 
866
 
                pushr_(xt_db_unlock_table_pool, table_pool);
867
 
                /* During this time, background processes can use the
868
 
                 * pool!
869
 
                 *
870
 
                 * May also do a flush, but this is now taken care
871
 
                 * of here {FLUSH-BUG}
872
 
                 */
873
 
                if ((ot = xt_db_open_pool_table(self, db, tab_id, NULL, TRUE))) {
874
 
                        pushr_(xt_db_return_table_to_pool, ot);
875
 
                        xt_sync_flush_table(self, ot, 0);
876
 
                        freer_(); //xt_db_return_table_to_pool_foreground(ot);
877
 
                }
878
 
 
879
 
                popr_(); // Discard xt_db_unlock_table_pool_no_lock(table_pool)
880
 
 
881
 
                xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
882
 
                pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
883
 
        }
884
 
        
885
 
        /* Free all open tables not in use: */
886
 
        ot = table_pool->opt_free_list;
887
 
        table_pool->opt_free_list = NULL;
888
 
        while (ot) {
889
 
                tmp_ot = ot->ot_otp_next_free;
890
 
 
891
 
                /* Remove from MRU list: */
892
 
                if (db->db_ot_pool.otp_lr_used == ot)
893
 
                        db->db_ot_pool.otp_lr_used = ot->ot_otp_mr_used;
894
 
                if (db->db_ot_pool.otp_mr_used == ot)
895
 
                        db->db_ot_pool.otp_mr_used = ot->ot_otp_lr_used;
896
 
                if (ot->ot_otp_lr_used)
897
 
                        ot->ot_otp_lr_used->ot_otp_mr_used = ot->ot_otp_mr_used;
898
 
                if (ot->ot_otp_mr_used)
899
 
                        ot->ot_otp_mr_used->ot_otp_lr_used = ot->ot_otp_lr_used;
900
 
 
901
 
                if (db->db_ot_pool.otp_lr_used)
902
 
                        db->db_ot_pool.otp_free_time = db->db_ot_pool.otp_lr_used->ot_otp_free_time;
903
 
                
904
 
                ASSERT_NS(db->db_ot_pool.otp_total_free > 0);
905
 
                db->db_ot_pool.otp_total_free--;
906
 
 
907
 
                /* Close the table: */
908
 
                ASSERT(table_pool->opt_total_open > 0);
909
 
                table_pool->opt_total_open--;
910
 
 
911
 
                ot->ot_thread = self;
912
 
                xt_close_table(ot, table_pool->opt_total_open == 0, FALSE);
913
 
 
914
 
                /* Go to the next: */
915
 
                ot = tmp_ot;
916
 
        }
917
 
 
918
 
        /* Wait for other to close: */
919
 
        while (table_pool->opt_total_open > 0) {
920
 
                xt_timed_wait_cond_ns(&db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
921
 
        }
922
 
 
923
 
        /* 2nd phase, now the table is really locked: */
924
 
        table_pool->opt_locked = XT_TABLE_LOCKED;
925
 
 
926
 
        freer_(); // xt_unlock_mutex(db_ot_pool.opt_lock)
927
 
        return table_pool;
928
 
}
929
 
 
930
 
/*
931
 
 * This function locks a particular table by locking the table directory
932
 
 * and waiting for all open tables handles to close.
933
 
 *
934
 
 * Things are a bit complicated because the sweeper must be turned off before
935
 
 * the table directory is locked.
936
 
 */
937
 
xtPublic XTOpenTablePoolPtr xt_db_lock_table_pool_by_name(XTThreadPtr self, XTDatabaseHPtr db, XTPathStrPtr tab_name, xtBool no_load, xtBool flush_table, xtBool missing_ok, XTTableHPtr *ret_tab)
938
 
{
939
 
        XTOpenTablePoolPtr      table_pool;
940
 
        XTTableHPtr                     tab;
941
 
        xtTableID                       tab_id;
942
 
 
943
 
        pushsr_(tab, xt_heap_release, xt_use_table(self, tab_name, no_load, missing_ok));
944
 
        if (!tab) {
945
 
                freer_(); // xt_heap_release(tab)
946
 
                return NULL;
947
 
        }
948
 
 
949
 
        tab_id = tab->tab_id;
950
 
 
951
 
        if (ret_tab) {
952
 
                *ret_tab = tab;
953
 
                table_pool = db_lock_table_pool(self, db, tab_id, flush_table);
954
 
                popr_(); // Discard xt_heap_release(tab)
955
 
                return table_pool;
956
 
        }
957
 
 
958
 
        freer_(); // xt_heap_release(tab)
959
 
        return db_lock_table_pool(self, db, tab_id, flush_table);
960
 
}
961
 
 
962
 
xtPublic void xt_db_unlock_table_pool(XTThreadPtr self, XTOpenTablePoolPtr table_pool)
963
 
{
964
 
        XTDatabaseHPtr db;
965
 
 
966
 
        if (!table_pool)
967
 
                return;
968
 
 
969
 
        db = table_pool->opt_db;
970
 
        xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
971
 
        pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
972
 
 
973
 
        table_pool->opt_locked = XT_TABLE_NOT_LOCKED;
974
 
        xt_broadcast_cond(self, &db->db_ot_pool.opt_cond);
975
 
        db_free_open_table_pool(NULL, table_pool);
976
 
 
977
 
        freer_(); // xt_unlock_mutex(db_ot_pool.opt_lock)
978
 
}
979
 
 
980
 
xtPublic XTOpenTablePtr xt_db_open_table_using_tab(XTTableHPtr tab, XTThreadPtr thread)
981
 
{
982
 
        XTDatabaseHPtr          db = tab->tab_db;
983
 
        XTOpenTablePoolPtr      table_pool;
984
 
        XTOpenTablePtr          ot;
985
 
 
986
 
        xt_lock_mutex_ns(&db->db_ot_pool.opt_lock);
987
 
 
988
 
        if (!(table_pool = db_get_open_table_pool(db, tab->tab_id)))
989
 
                goto failed;
990
 
 
991
 
        while (table_pool->opt_locked) {
992
 
                if (!xt_timed_wait_cond_ns(&db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000))
993
 
                        goto failed_1;
994
 
                if (!(table_pool = db_get_open_table_pool(db, tab->tab_id)))
995
 
                        goto failed;
996
 
        }
997
 
 
998
 
        if ((ot = table_pool->opt_free_list)) {
999
 
                /* Remove from the free list: */
1000
 
                table_pool->opt_free_list = ot->ot_otp_next_free;
1001
 
                
1002
 
                /* Remove from MRU list: */
1003
 
                if (db->db_ot_pool.otp_lr_used == ot)
1004
 
                        db->db_ot_pool.otp_lr_used = ot->ot_otp_mr_used;
1005
 
                if (db->db_ot_pool.otp_mr_used == ot)
1006
 
                        db->db_ot_pool.otp_mr_used = ot->ot_otp_lr_used;
1007
 
                if (ot->ot_otp_lr_used)
1008
 
                        ot->ot_otp_lr_used->ot_otp_mr_used = ot->ot_otp_mr_used;
1009
 
                if (ot->ot_otp_mr_used)
1010
 
                        ot->ot_otp_mr_used->ot_otp_lr_used = ot->ot_otp_lr_used;
1011
 
 
1012
 
                if (db->db_ot_pool.otp_lr_used)
1013
 
                        db->db_ot_pool.otp_free_time = db->db_ot_pool.otp_lr_used->ot_otp_free_time;
1014
 
 
1015
 
                ASSERT_NS(db->db_ot_pool.otp_total_free > 0);
1016
 
                db->db_ot_pool.otp_total_free--;
1017
 
 
1018
 
                ot->ot_thread = thread;
1019
 
                goto done_ok;
1020
 
        }
1021
 
 
1022
 
        if ((ot = xt_open_table(tab))) {
1023
 
                ot->ot_thread = thread;
1024
 
                table_pool->opt_total_open++;
1025
 
        }
1026
 
 
1027
 
        done_ok:
1028
 
        db_free_open_table_pool(NULL, table_pool);
1029
 
        xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
1030
 
        return ot;
1031
 
 
1032
 
        failed_1:
1033
 
        db_free_open_table_pool(NULL, table_pool);
1034
 
 
1035
 
        failed:
1036
 
        xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
1037
 
        return NULL;
1038
 
}
1039
 
 
1040
 
xtPublic xtBool xt_db_open_pool_table_ns(XTOpenTablePtr *ret_ot, XTDatabaseHPtr db, xtTableID tab_id)
1041
 
{
1042
 
        XTThreadPtr     self = xt_get_self();
1043
 
        xtBool          ok = TRUE;
1044
 
 
1045
 
        try_(a) {
1046
 
                *ret_ot = xt_db_open_pool_table(self, db, tab_id, NULL, FALSE);
1047
 
        }
1048
 
        catch_(a) {
1049
 
                ok = FALSE;
1050
 
        }
1051
 
        cont_(a);
1052
 
        return ok;
1053
 
}
1054
 
 
1055
 
xtPublic XTOpenTablePtr xt_db_open_pool_table(XTThreadPtr self, XTDatabaseHPtr db, xtTableID tab_id, int *result, xtBool i_am_background)
1056
 
{
1057
 
        XTOpenTablePtr          ot;
1058
 
        XTOpenTablePoolPtr      table_pool;
1059
 
        XTTableHPtr                     tab;
1060
 
 
1061
 
        xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
1062
 
        pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
1063
 
 
1064
 
        if (!(table_pool = db_get_open_table_pool(db, tab_id)))
1065
 
                xt_throw(self);
1066
 
 
1067
 
        /* Background processes do not have to wait while flushing!
1068
 
         *
1069
 
         * I think I did this so that the background process would
1070
 
         * not hang during flushing. Exact reason currently
1071
 
         * unknown (maybe not any more see {HANG-ON-FREEER} below).
1072
 
         *
1073
 
         * NOTE: I have taken out check:
1074
 
         * && !(i_am_background && table_pool->opt_flushing) below.
1075
 
         * To which the text above refers. The reason is because it
1076
 
         * has been replaced by the more general rule (described
1077
 
         * below). This was used to ensure that background
1078
 
         * threads do not hang when the table is flushed
1079
 
         * in db_lock_table_pool(). Now (see below) I make
1080
 
         * sure that a background thread does not hang during a
1081
 
         * lock table, as long as the locker is still waiting!
1082
 
         *
1083
 
         * The fact that a background thread can get a open table
1084
 
         * handle while a user thread is locking and flushing a table
1085
 
         * led to the situation that the checkpointer
1086
 
         * could flush at the same time as a user process
1087
 
         * which was flushing due to a rename.
1088
 
         *
1089
 
         * This led to the situation described here: {FLUSH-BUG},
1090
 
         * which is now fixed.
1091
 
         *
1092
 
         * {HANG-ON-FREEER}
1093
 
         * 
1094
 
         * This error occurred during count_distinct3
1095
 
         *
1096
 
         * The sweeper is waiting for the free'er, but the sweeper has a table
1097
 
         * open (Table ./test/t2 test, ID 2)
1098
 
         *
1099
 
         * if (!xt_timed_wait_cond_ns(&dcg->tcm_freeer_cond, &dcg->tcm_freeer_lock, 30000)) {
1100
 
         *      dcg->tcm_threads_waiting--;
1101
 
         *      break;
1102
 
         * }
1103
 
         * #3   0x00e2a5b6 in xt_p_cond_timedwait at pthread_xt.cc:697
1104
 
         * #4   0x00e51543 in xt_timed_wait_cond at thread_xt.cc:2053
1105
 
         * #5   0x00e38fac in XTTabCache::tc_fetch at tabcache_xt.cc:682
1106
 
         * #6   0x00e3974c in XTTabCache::xt_tc_write_cond at tabcache_xt.cc:279
1107
 
         * #7   0x00e5ce31 in xn_sw_cleanup_variation at xaction_xt.cc:2101
1108
 
         * #8   0x00e5d619 in xn_sw_cleanup_xact at xaction_xt.cc:2327
1109
 
         * #9   0x00e5dddd in xn_sw_main at xaction_xt.cc:2608
1110
 
         * #10  0x00e5e123 in xn_sw_run_thread at xaction_xt.cc:2741
1111
 
         * #11  0x00e50640 in thr_main at thread_xt.cc:1081
1112
 
         * #12  0x91fdb155 in _pthread_start
1113
 
         * #13  0x91fdb012 in thread_start
1114
 
         *
1115
 
         * The user thread is trying to drop the table but the table has 1 open table
1116
 
         * (Table ./test/t2 test, ID 2)
1117
 
         *
1118
 
         * while (table_pool->opt_total_open > 0) {
1119
 
         *      xt_timed_wait_cond(self, &db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
1120
 
         * }
1121
 
         * #3   0x00e2a5b6 in xt_p_cond_timedwait at pthread_xt.cc:697
1122
 
         * #4   0x00e51543 in xt_timed_wait_cond at thread_xt.cc:2053
1123
 
         * But fix: xt_db_wait_for_open_tables() has been removed!
1124
 
         * #5   0x00de88a2 in xt_db_wait_for_open_tables at database_xt.cc:966
1125
 
         * #6   0x00e3e983 in tab_lock_table at table_xt.cc:1557
1126
 
         * #7   0x00e3eb1e in xt_drop_table at table_xt.cc:1941
1127
 
         * #8   0x00e0e78e in ha_pbxt::delete_table at ha_pbxt.cc:4848
1128
 
         * #9   0x00243a14 in handler::ha_delete_table at handler.cc:3311
1129
 
         * #10  0x00243b7e in ha_delete_table at handler.cc:1954
1130
 
         * #11  0x0025c990 in mysql_rm_table_part2 at sql_table.cc:1724
1131
 
         * #12  0x0025cee8 in mysql_rm_table at sql_table.cc:1518
1132
 
         * #13  0x0011256c in mysql_execute_command at sql_parse.cc:3357
1133
 
         * #14  0x001188ed in mysql_parse at sql_parse.cc:5929
1134
 
         * #15  0x00119663 in dispatch_command at sql_parse.cc:1216
1135
 
         * #16  0x0011a960 in do_command at sql_parse.cc:857
1136
 
         * #17  0x00105a3c in handle_one_connection at sql_connect.cc:1115
1137
 
         * #18  0x91fdb155 in _pthread_start
1138
 
         * #19  0x91fdb012 in thread_start
1139
 
         * 
1140
 
         * The free'er would like to free some memory but cannot becuase it requires
1141
 
         * an open table handle, but the pool is locked by the user thread which
1142
 
         * wants to drop the table.
1143
 
         *
1144
 
         * // Free'er wants to get an open table, but the pool is locked (by the renamer)
1145
 
         * while (table_pool->opt_locked && !(i_am_background && table_pool->opt_flushing)) {
1146
 
         *      xt_timed_wait_cond(self, &db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
1147
 
         *      if (!(table_pool = db_get_open_table_pool(db, tab_id)))
1148
 
         *              xt_throw(self);
1149
 
         * }
1150
 
         * #0   0x91fb146e in __semwait_signal
1151
 
         * #1   0x91fdc3e6 in _pthread_cond_wait
1152
 
         * #2   0x920019f8 in pthread_cond_timedwait$UNIX2003
1153
 
         * #3   0x00e2a5b6 in xt_p_cond_timedwait at pthread_xt.cc:697
1154
 
         * #4   0x00e51543 in xt_timed_wait_cond at thread_xt.cc:2053
1155
 
         * #5   0x00de8d8a in xt_db_open_pool_table at database_xt.cc:1091
1156
 
         * #6   0x00e39ce3 in tabc_get_table at tabcache_xt.cc:983
1157
 
         * #7   0x00e39deb in tabc_free_page at tabcache_xt.cc:1028
1158
 
         * #8   0x00e3a5d2 in tabc_fr_main at tabcache_xt.cc:1227
1159
 
         * #9   0x00e3a954 in tabc_fr_run_thread at tabcache_xt.cc:1290
1160
 
         * #10  0x00e50640 in thr_main at thread_xt.cc:1081
1161
 
         * #11  0x91fdb155 in _pthread_start
1162
 
         * #12  0x91fdb012 in thread_start
1163
 
         *
1164
 
         * My proposed solution:
1165
 
         *
1166
 
         * Firstly, It can be assumed that background (system processes) will eventually
1167
 
         * stop activity, once all work has been done.
1168
 
         * So allowing them to proceed, even when the table is locked will
1169
 
         * not lead to a livelock.
1170
 
         *
1171
 
         * This means that we should allow background processes to proceed with a
1172
 
         * locked table, as long as the locker is waiting.
1173
 
         */
1174
 
        while (table_pool->opt_locked) {
1175
 
                if (i_am_background && table_pool->opt_locked != XT_TABLE_LOCKED) {
1176
 
                        /* Background processes can proceed, if the locker is not in the
1177
 
                         * final lock phase!
1178
 
                         */
1179
 
                        break;
1180
 
                }
1181
 
                xt_timed_wait_cond(self, &db->db_ot_pool.opt_cond, &db->db_ot_pool.opt_lock, 2000);
1182
 
                if (!(table_pool = db_get_open_table_pool(db, tab_id)))
1183
 
                        xt_throw(self);
1184
 
        }
1185
 
 
1186
 
        /* Moved from above, because db_get_open_table_pool() may return a different
1187
 
         * pool on each call!
1188
 
        */
1189
 
        pushr_(db_free_open_table_pool, table_pool);    
1190
 
        
1191
 
        if ((ot = table_pool->opt_free_list)) {
1192
 
                /* Remove from the free list: */
1193
 
                table_pool->opt_free_list = ot->ot_otp_next_free;
1194
 
                
1195
 
                /* Remove from MRU list: */
1196
 
                if (db->db_ot_pool.otp_lr_used == ot)
1197
 
                        db->db_ot_pool.otp_lr_used = ot->ot_otp_mr_used;
1198
 
                if (db->db_ot_pool.otp_mr_used == ot)
1199
 
                        db->db_ot_pool.otp_mr_used = ot->ot_otp_lr_used;
1200
 
                if (ot->ot_otp_lr_used)
1201
 
                        ot->ot_otp_lr_used->ot_otp_mr_used = ot->ot_otp_mr_used;
1202
 
                if (ot->ot_otp_mr_used)
1203
 
                        ot->ot_otp_mr_used->ot_otp_lr_used = ot->ot_otp_lr_used;
1204
 
 
1205
 
                if (db->db_ot_pool.otp_lr_used)
1206
 
                        db->db_ot_pool.otp_free_time = db->db_ot_pool.otp_lr_used->ot_otp_free_time;
1207
 
 
1208
 
                ASSERT(db->db_ot_pool.otp_total_free > 0);
1209
 
                db->db_ot_pool.otp_total_free--;
1210
 
 
1211
 
                freer_(); // db_free_open_table_pool(table_pool)
1212
 
                freer_(); // xt_unlock_mutex(&db->db_ot_pool.opt_lock)
1213
 
                ot->ot_thread = self;
1214
 
                return ot;
1215
 
        }
1216
 
 
1217
 
        if (!(tab = xt_use_table_by_id(self, db, tab_id, result))) {
1218
 
                /* The table no longer exists, ignore the change: */
1219
 
                freer_(); // db_free_open_table_pool(table_pool)
1220
 
                freer_(); // xt_unlock_mutex(&db->db_ot_pool.opt_lock)
1221
 
                return NULL;
1222
 
        }
1223
 
 
1224
 
        /* xt_use_table_by_id returns a referenced tab! */
1225
 
        pushr_(xt_heap_release, tab);
1226
 
        if ((ot = xt_open_table(tab))) {
1227
 
                ot->ot_thread = self;
1228
 
                table_pool->opt_total_open++;
1229
 
        }
1230
 
        freer_(); // xt_release_heap(tab)
1231
 
 
1232
 
        freer_(); // db_free_open_table_pool(table_pool)
1233
 
        freer_(); // xt_unlock_mutex(&db->db_ot_pool.opt_lock)
1234
 
        return ot;
1235
 
}
1236
 
 
1237
 
xtPublic void xt_db_return_table_to_pool(XTThreadPtr XT_UNUSED(self), XTOpenTablePtr ot)
1238
 
{
1239
 
        xt_db_return_table_to_pool_ns(ot);
1240
 
}
1241
 
 
1242
 
xtPublic void xt_db_return_table_to_pool_ns(XTOpenTablePtr ot)
1243
 
{
1244
 
        XTOpenTablePoolPtr      table_pool;
1245
 
        XTDatabaseHPtr          db = ot->ot_table->tab_db;
1246
 
        xtBool                          flush_table = TRUE;
1247
 
 
1248
 
        /* No open table returned to the pool should still
1249
 
         * have a cache handle!
1250
 
         */
1251
 
        ASSERT_NS(!ot->ot_ind_rhandle);
1252
 
        xt_lock_mutex_ns(&db->db_ot_pool.opt_lock);
1253
 
 
1254
 
        if (!(table_pool = db_get_open_table_pool(db, ot->ot_table->tab_id)))
1255
 
                goto failed;
1256
 
 
1257
 
        if (table_pool->opt_locked) {
1258
 
                /* Table will be closed below, because the table is
1259
 
                 * locked: */
1260
 
                if (table_pool->opt_total_open > 1)
1261
 
                        flush_table = FALSE;
1262
 
        }
1263
 
        else {
1264
 
                /* Put it on the free list: */
1265
 
                db->db_ot_pool.otp_total_free++;
1266
 
 
1267
 
                ot->ot_otp_next_free = table_pool->opt_free_list;
1268
 
                table_pool->opt_free_list = ot;
1269
 
 
1270
 
                /* This is the time the table was freed: */
1271
 
                ot->ot_otp_free_time = xt_db_approximate_time;
1272
 
 
1273
 
                /* Add to most recently used: */
1274
 
                if ((ot->ot_otp_lr_used = db->db_ot_pool.otp_mr_used))
1275
 
                        db->db_ot_pool.otp_mr_used->ot_otp_mr_used = ot;
1276
 
                ot->ot_otp_mr_used = NULL;
1277
 
                db->db_ot_pool.otp_mr_used = ot;
1278
 
                if (!db->db_ot_pool.otp_lr_used) {
1279
 
                        db->db_ot_pool.otp_lr_used = ot;
1280
 
                        db->db_ot_pool.otp_free_time = ot->ot_otp_free_time;
1281
 
                }
1282
 
 
1283
 
                ot = NULL;
1284
 
        }
1285
 
 
1286
 
        if (ot) {
1287
 
                xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
1288
 
                xt_close_table(ot, flush_table, FALSE);
1289
 
 
1290
 
                /* assume that table_pool cannot be invalidated in between as we have table_pool->opt_total_open > 0 */
1291
 
                xt_lock_mutex_ns(&db->db_ot_pool.opt_lock);
1292
 
                table_pool->opt_total_open--;
1293
 
        }
1294
 
 
1295
 
        db_free_open_table_pool(NULL, table_pool);
1296
 
 
1297
 
        if (!xt_broadcast_cond_ns(&db->db_ot_pool.opt_cond))
1298
 
                goto failed;
1299
 
        xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
1300
 
        
1301
 
        return;
1302
 
 
1303
 
        failed:
1304
 
        xt_unlock_mutex_ns(&db->db_ot_pool.opt_lock);
1305
 
        if (ot)
1306
 
                xt_close_table(ot, TRUE, FALSE);
1307
 
        xt_log_and_clear_exception_ns();
1308
 
}
1309
 
 
1310
 
//#define TEST_FREE_OPEN_TABLES
1311
 
 
1312
 
#ifdef DEBUG
1313
 
#undef XT_OPEN_TABLE_FREE_TIME
1314
 
#define XT_OPEN_TABLE_FREE_TIME                 5
1315
 
#endif
1316
 
 
1317
 
xtPublic void xt_db_free_unused_open_tables(XTThreadPtr self, XTDatabaseHPtr db)
1318
 
{
1319
 
        XTOpenTablePoolPtr      table_pool;
1320
 
        size_t                          count;
1321
 
        XTOpenTablePtr          ot;
1322
 
        xtBool                          flush_table = TRUE;
1323
 
        u_int                           table_count;
1324
 
 
1325
 
        /* A quick check of the oldest free table: */
1326
 
        if (xt_db_approximate_time < db->db_ot_pool.otp_free_time + XT_OPEN_TABLE_FREE_TIME)
1327
 
                return;
1328
 
 
1329
 
        table_count = db->db_table_by_id ? xt_sl_get_size(db->db_table_by_id) : 0;
1330
 
        count = table_count * 3;
1331
 
        if (count < 20)
1332
 
                count = 20;
1333
 
#ifdef TEST_FREE_OPEN_TABLES
1334
 
        count = 10;
1335
 
#endif
1336
 
        if (db->db_ot_pool.otp_total_free > count) {
1337
 
                XTOpenTablePtr  ptr, pptr;
1338
 
 
1339
 
                count = table_count * 2;
1340
 
                if (count < 10)
1341
 
                        count = 10;
1342
 
#ifdef TEST_FREE_OPEN_TABLES
1343
 
                count = 5;
1344
 
#endif
1345
 
                xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
1346
 
                pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
1347
 
 
1348
 
                while (db->db_ot_pool.otp_total_free > count) {
1349
 
                        ASSERT_NS(db->db_ot_pool.otp_lr_used);
1350
 
                        if (!(ot = db->db_ot_pool.otp_lr_used))
1351
 
                                break;
1352
 
 
1353
 
                        /* Check how long the open table has been free: */
1354
 
                        if (xt_db_approximate_time < ot->ot_otp_free_time + XT_OPEN_TABLE_FREE_TIME)
1355
 
                                break;
1356
 
 
1357
 
                        ot->ot_thread = self;
1358
 
 
1359
 
                        /* Remove from MRU list: */
1360
 
                        db->db_ot_pool.otp_lr_used = ot->ot_otp_mr_used;
1361
 
                        if (db->db_ot_pool.otp_mr_used == ot)
1362
 
                                db->db_ot_pool.otp_mr_used = ot->ot_otp_lr_used;
1363
 
                        if (ot->ot_otp_lr_used)
1364
 
                                ot->ot_otp_lr_used->ot_otp_mr_used = ot->ot_otp_mr_used;
1365
 
                        if (ot->ot_otp_mr_used)
1366
 
                                ot->ot_otp_mr_used->ot_otp_lr_used = ot->ot_otp_lr_used;
1367
 
 
1368
 
                        if (db->db_ot_pool.otp_lr_used)
1369
 
                                db->db_ot_pool.otp_free_time = db->db_ot_pool.otp_lr_used->ot_otp_free_time;
1370
 
 
1371
 
                        ASSERT(db->db_ot_pool.otp_total_free > 0);
1372
 
                        db->db_ot_pool.otp_total_free--;
1373
 
 
1374
 
                        if (!(table_pool = db_get_open_table_pool(db, ot->ot_table->tab_id)))
1375
 
                                xt_throw(self);
1376
 
 
1377
 
                        /* Find the open table in the table pool,
1378
 
                         * and remove it from the list:
1379
 
                         */
1380
 
                        pptr = NULL;
1381
 
                        ptr = table_pool->opt_free_list;
1382
 
                        while (ptr) {
1383
 
                                if (ptr == ot)
1384
 
                                        break;
1385
 
                                pptr = ptr;
1386
 
                                ptr = ptr->ot_otp_next_free;
1387
 
                        }
1388
 
 
1389
 
                        ASSERT_NS(ptr == ot);
1390
 
                        if (ptr == ot) {
1391
 
                                if (pptr)
1392
 
                                        pptr->ot_otp_next_free = ot->ot_otp_next_free;
1393
 
                                else
1394
 
                                        table_pool->opt_free_list = ot->ot_otp_next_free;
1395
 
                        }
1396
 
 
1397
 
                        ASSERT_NS(table_pool->opt_total_open > 0);
1398
 
                        table_pool->opt_total_open--;
1399
 
                        if (table_pool->opt_total_open > 0)
1400
 
                                flush_table = FALSE;
1401
 
                        else
1402
 
                                flush_table = TRUE;
1403
 
 
1404
 
                        db_free_open_table_pool(self, table_pool);
1405
 
 
1406
 
                        freer_();
1407
 
 
1408
 
                        /* Close the table, but not
1409
 
                         * while holding the lock.
1410
 
                         */
1411
 
                        xt_close_table(ot, flush_table, FALSE);
1412
 
 
1413
 
                        xt_lock_mutex(self, &db->db_ot_pool.opt_lock);
1414
 
                        pushr_(xt_unlock_mutex, &db->db_ot_pool.opt_lock);
1415
 
                }
1416
 
 
1417
 
                freer_();
1418
 
        }
1419
 
}
1420
 
 
1421
 
/*
1422
 
 * -----------------------------------------------------------------------
1423
 
 * THE THREAD POOL
1424
 
 */
1425
 
 
1426
 
static void db_notify_all(XTTask *tk)
1427
 
{
1428
 
        XTThreadPtr target;
1429
 
 
1430
 
        for (u_int i=0; ; i++) {
1431
 
                if (!(target = (XTThreadPtr) tk->tk_notify_threads.pl_get_pointer(i)))
1432
 
                        break;
1433
 
 
1434
 
                /* The task is on the thread's to-do list: */
1435
 
                xt_lock_thread(target);
1436
 
                if (target->st_tasks_todo.pl_remove_pointer(tk)) {
1437
 
                        /* Notify the thread that there are no more tasks to do... */
1438
 
                        if (target->st_tasks_todo.pl_is_empty())
1439
 
                                xt_signal_thread(target);
1440
 
                        /* Release tasks that were on the to-do list: */
1441
 
                        tk->tk_release();
1442
 
                }
1443
 
                xt_unlock_thread(target);
1444
 
        }
1445
 
        tk->tk_notify_threads.pl_clear();
1446
 
}
1447
 
 
1448
 
static void db_thread_pool_main(XTThreadPtr self)
1449
 
{
1450
 
        XTDatabaseHPtr  db = self->st_database;
1451
 
        XTTask                  *tk;
1452
 
        XTThreadPtr             target;
1453
 
        xtBool                  job_done = FALSE;
1454
 
 
1455
 
        for (;;) {
1456
 
                xt_lock_mutex(self, &db->db_pool_lock);
1457
 
                pushr_(xt_unlock_mutex, &db->db_pool_lock);
1458
 
 
1459
 
                if (job_done) {
1460
 
                        db->db_pool_job_count--;
1461
 
                        job_done = FALSE;
1462
 
                }
1463
 
                
1464
 
                if (self->t_quit) {
1465
 
                        freer_();
1466
 
                        break;
1467
 
                }
1468
 
 
1469
 
                while (!self->t_quit && !(tk = db->db_task_queue_front)) {
1470
 
                        /* Wait for 1/10 second (to ensure we quit on time): */
1471
 
                        xt_timed_wait_cond(self, &db->db_pool_cond, &db->db_pool_lock, 100);
1472
 
                }
1473
 
 
1474
 
                if (self->t_quit) {
1475
 
                        freer_();
1476
 
                        break;
1477
 
                }
1478
 
 
1479
 
                db->db_task_queue_front = tk->tk_task_list_next;
1480
 
                if (!db->db_task_queue_front)
1481
 
                        db->db_task_queue_back = NULL;
1482
 
 
1483
 
                freer_();
1484
 
 
1485
 
                /* Perform the task: */
1486
 
                job_done = TRUE;
1487
 
                if (!tk->tk_task(self)) {
1488
 
                        /* Transfer error to the task: */
1489
 
                        tk->tk_success = false;
1490
 
                        if ((tk->tk_exception = (XTExceptionPtr) xt_malloc_ns(sizeof(XTExceptionRec))))
1491
 
                                *tk->tk_exception = self->t_exception;
1492
 
                        else
1493
 
                                tk->tk_out_of_memory = true;
1494
 
                }
1495
 
                else
1496
 
                        tk->tk_success = true;
1497
 
 
1498
 
                tk->tk_lock();
1499
 
                tk->tk_running = FALSE;
1500
 
 
1501
 
                /* Notify any there were forgotten: */
1502
 
                db_notify_all(tk);
1503
 
 
1504
 
                /* The task is done: */
1505
 
                if (tk->tk_waiting_threads.pl_is_empty()) {
1506
 
                        /* No waiting tasks, log the error: */
1507
 
                        if (!tk->tk_success)
1508
 
                                xt_log_and_clear_exception(self);
1509
 
                }
1510
 
                else {
1511
 
                        for (u_int i=0; ; i++) {
1512
 
                                if (!(target = (XTThreadPtr) tk->tk_waiting_threads.pl_get_pointer(i)))
1513
 
                                        break;
1514
 
 
1515
 
                                /* The task is on the thread's to-do list: */
1516
 
                                xt_lock_thread(target);
1517
 
                                if (target->st_tasks_todo.pl_remove_pointer(tk)) {
1518
 
                                        /* Add to the done list: */
1519
 
                                        if (!target->st_tasks_done.pl_add_pointer(tk)) {
1520
 
                                                tk->tk_release();
1521
 
                                                xt_log_and_clear_exception(self);
1522
 
                                        }
1523
 
 
1524
 
                                        /* Notify the thread that there are no more tasks to do... */
1525
 
                                        if (target->st_tasks_todo.pl_is_empty())
1526
 
                                                xt_signal_thread(target);
1527
 
                                }
1528
 
                                xt_unlock_thread(target);
1529
 
                        }
1530
 
                        tk->tk_waiting_threads.pl_clear();
1531
 
                }
1532
 
 
1533
 
                tk->tk_unlock();
1534
 
                /* We assume the reference is required to take the lock!
1535
 
                 */
1536
 
                tk->tk_release();
1537
 
        }
1538
 
}
1539
 
 
1540
 
typedef struct DBThreadData {
1541
 
        XTDatabaseHPtr  td_db;
1542
 
} DBThreadDataRec, *DBThreadDataPtr;
1543
 
 
1544
 
static void *db_thread_pool_run_thread(XTThreadPtr self)
1545
 
{
1546
 
        DBThreadDataPtr td = (DBThreadDataPtr) self->t_data;
1547
 
        XTDatabaseHPtr  db = td->td_db;
1548
 
        volatile int                            i = 0;
1549
 
 
1550
 
        /* Note, the MySQL thread will be free when the this
1551
 
         * thread quits.
1552
 
         */
1553
 
        if (!myxt_create_thread())
1554
 
                xt_throw(self);
1555
 
 
1556
 
        while (!self->t_quit && i<10) {
1557
 
                try_(a) {
1558
 
                        /* Use the database: */
1559
 
                        xt_use_database(self, db, XT_FOR_POOL);
1560
 
 
1561
 
                        /* {BACKGROUND-RELEASE-DB} */
1562
 
                        xt_heap_release(self, self->st_database);
1563
 
 
1564
 
                        db_thread_pool_main(self);
1565
 
                }
1566
 
                catch_(a) {
1567
 
                        /* This error is "normal"! */
1568
 
                        if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
1569
 
                                !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
1570
 
                                self->t_exception.e_sys_err == SIGTERM))
1571
 
                                xt_log_and_clear_exception(self);
1572
 
                }
1573
 
                cont_(a);
1574
 
 
1575
 
                /* Avoid releasing the database (done above) */
1576
 
                self->st_database = NULL;
1577
 
                xt_unuse_database(self, self);
1578
 
 
1579
 
                if (self->t_quit)
1580
 
                        break;
1581
 
 
1582
 
                /* Pause, in case the error repeats: */
1583
 
                xt_sleep_milli_second(10000);
1584
 
                i++;
1585
 
        }
1586
 
 
1587
 
        return NULL;
1588
 
}
1589
 
 
1590
 
static void db_free_pool_thread(XTThreadPtr self, void *data)
1591
 
{
1592
 
        DBThreadDataPtr         td = (DBThreadDataPtr) data;
1593
 
        XTDatabaseHPtr          db = td->td_db;
1594
 
        XTThreadPtr                     thread, pthread;
1595
 
 
1596
 
        xt_free(self, data);
1597
 
 
1598
 
        xt_lock_mutex(self, &db->db_pool_lock);
1599
 
        pushr_(xt_unlock_mutex, &db->db_pool_lock);
1600
 
 
1601
 
        /* Remove the thread from the pool: */
1602
 
        pthread = NULL;
1603
 
        thread = db->db_thread_pool;
1604
 
        while (thread != self) {
1605
 
                pthread = thread;
1606
 
                thread = thread->st_pool_next;
1607
 
        }
1608
 
        if (thread == self) {
1609
 
                db->db_pool_thread_count--;
1610
 
                if (pthread)
1611
 
                        pthread->st_pool_next = thread->st_pool_next;
1612
 
                else
1613
 
                        db->db_thread_pool = thread->st_pool_next;
1614
 
        }
1615
 
 
1616
 
        freer_(); // xt_unlock_mutex(&db->db_pool_lock)
1617
 
}
1618
 
 
1619
 
static XTThreadPtr db_create_pool_thread(XTDatabaseHPtr db)
1620
 
{
1621
 
        DBThreadDataPtr td;
1622
 
        char                    name[PATH_MAX];
1623
 
        XTThreadPtr             thread;
1624
 
 
1625
 
 
1626
 
        /* Note, this is just a test to see if we can create a MySQL thread.
1627
 
         * On shutdown, this is sometimes not possible!
1628
 
         */
1629
 
        if (!myxt_create_thread_possible())
1630
 
                return NULL;
1631
 
 
1632
 
        if (!(td = (DBThreadDataPtr) xt_malloc_ns(sizeof(DBThreadDataRec))))
1633
 
                return NULL;
1634
 
        td->td_db = db;
1635
 
 
1636
 
        db->db_pool_thread_count++;
1637
 
        sprintf(name, "I/O-%s", xt_last_directory_of_path(db->db_main_path));
1638
 
        xt_remove_dir_char(name);
1639
 
        xt_strcat(PATH_MAX, name, "-");
1640
 
        xt_strcati(PATH_MAX, name, db->db_pool_thread_count);
1641
 
        if (!(thread = xt_create_daemon_ns(name)))
1642
 
                goto failed;
1643
 
 
1644
 
        thread->st_pool_next = db->db_thread_pool;
1645
 
        db->db_thread_pool = thread;
1646
 
 
1647
 
        xt_set_thread_data(thread, td, db_free_pool_thread);
1648
 
 
1649
 
        return thread;
1650
 
 
1651
 
        failed:
1652
 
        xt_free_ns(td);
1653
 
        return NULL;
1654
 
}
1655
 
 
1656
 
/*
1657
 
 * Create a pool thread.
1658
 
 *
1659
 
 * This ensures that there is at least one pool thread.
1660
 
 *
1661
 
 * The advantage is that if, for some reason, no more threads
1662
 
 * can be created, then we have one that can do all the work.
1663
 
 */
1664
 
static void db_create_pool_thread(XTThreadPtr self, XTDatabaseHPtr db)
1665
 
{
1666
 
        XTThreadPtr     pool_thread = NULL;
1667
 
 
1668
 
        if (!(pool_thread = db_create_pool_thread(db)))
1669
 
                xt_throw(self);
1670
 
 
1671
 
        xt_run_thread(self, pool_thread, db_thread_pool_run_thread);
1672
 
}
1673
 
 
1674
 
xtPublic void xt_db_thread_pool_init(XTThreadPtr self, XTDatabaseHPtr db)
1675
 
{
1676
 
        xt_init_mutex_with_autoname(self, &db->db_pool_lock);
1677
 
        xt_init_cond(self, &db->db_pool_cond);
1678
 
        db_create_pool_thread(self, db);
1679
 
}
1680
 
 
1681
 
xtPublic void xt_db_thread_pool_exit(XTThreadPtr self, XTDatabaseHPtr db)
1682
 
{
1683
 
        xt_db_stop_pool_threads(self, db);
1684
 
        xt_free_mutex(&db->db_pool_lock);
1685
 
        xt_free_cond(&db->db_pool_cond);
1686
 
}
1687
 
 
1688
 
xtPublic void xt_db_stop_pool_threads(XTThreadPtr self, XTDatabaseHPtr db)
1689
 
{
1690
 
        XTThreadPtr     thread;
1691
 
        xtThreadID      tid;
1692
 
 
1693
 
        if (db->db_thread_pool) {
1694
 
                xt_lock_mutex(self, &db->db_pool_lock);
1695
 
                pushr_(xt_unlock_mutex, &db->db_pool_lock);
1696
 
 
1697
 
                while ((thread = db->db_thread_pool)) {
1698
 
                        tid = thread->t_id;
1699
 
 
1700
 
                        xt_terminate_thread(self, thread);
1701
 
                        xt_broadcast_cond(self, &db->db_pool_cond);
1702
 
 
1703
 
                        freer_(); // xt_unlock_mutex(&db->db_pool_lock)
1704
 
 
1705
 
                        xt_wait_for_thread_to_exit(tid, FALSE);
1706
 
 
1707
 
                        xt_lock_mutex(self, &db->db_pool_lock);
1708
 
                        pushr_(xt_unlock_mutex, &db->db_pool_lock);
1709
 
                }
1710
 
 
1711
 
                freer_(); // xt_unlock_mutex(&db->db_pool_lock)
1712
 
        }
1713
 
}
1714
 
 
1715
 
/*
1716
 
 * notify_complete means the thread will wait for the task to complete, and process the result.
1717
 
 * notify_early means the thread will wait for a notification from the task itself.
1718
 
 *
1719
 
 * Note. we assume the caller has a reference to the task. The caller is also responsible for
1720
 
 * freeing the reference!
1721
 
 *
1722
 
 * This function will take extra references as required!
1723
 
 */
1724
 
xtPublic xtBool xt_run_async_task(XTTask *tk, xtBool notify_complete, xtBool notify_early, XTThreadPtr thread, XTDatabaseHPtr db)
1725
 
{
1726
 
        tk->tk_lock();
1727
 
        if (notify_complete || notify_early) {
1728
 
                xt_lock_thread(thread);
1729
 
                /* Count the reference is the to-do list of the thread. */
1730
 
                tk->tk_reference();
1731
 
                if (!thread->st_tasks_todo.pl_add_pointer(tk)) {
1732
 
                        tk->tk_release();
1733
 
                        xt_unlock_thread(thread);
1734
 
                        goto failed_0;
1735
 
                }
1736
 
                xt_unlock_thread(thread);
1737
 
                
1738
 
                if (notify_complete) {
1739
 
                        if (!tk->tk_waiting_threads.pl_add_pointer(thread))
1740
 
                                goto failed_1;
1741
 
                }
1742
 
                else {
1743
 
                        if (!tk->tk_notify_threads.pl_add_pointer(thread))
1744
 
                                goto failed_1;
1745
 
                }
1746
 
        }
1747
 
 
1748
 
        if (!tk->tk_running) {
1749
 
                XTThreadPtr     pool_thread = NULL;
1750
 
 
1751
 
                /* Note, the caller should already have a reference,
1752
 
                 * otherwise it would not be safe to access the task.
1753
 
                 *
1754
 
                 * When running, we add one more reference. The reference
1755
 
                 * is owned by the running thread.
1756
 
                 *
1757
 
                 * The reference will be released, when the task exection
1758
 
                 * is complete.
1759
 
                 */
1760
 
                tk->tk_reference();
1761
 
                tk->tk_running = TRUE;
1762
 
                xt_lock_mutex_ns(&db->db_pool_lock);
1763
 
 
1764
 
                /* Check if we need to start a new thread: */
1765
 
                db->db_pool_job_count++;
1766
 
                if (db->db_pool_job_count > db->db_pool_thread_count && db->db_pool_thread_count < XT_ASYNC_THREAD_COUNT) {
1767
 
                        if (!(pool_thread = db_create_pool_thread(db))) {
1768
 
                                if (thread->t_exception.e_xt_err == XT_ERR_MYSQL_NO_THREAD ||
1769
 
                                        thread->t_exception.e_xt_err == XT_ERR_MYSQL_ERROR) {
1770
 
                                        /* We can ignore this error if error:
1771
 
                                         * XT_ERR_MYSQL_NO_THREAD can occur on shutdown. If we already have
1772
 
                                         * pool threads, then this is no problem!
1773
 
                                         */
1774
 
                                        if (db->db_pool_thread_count > 0)
1775
 
                                                goto ignore_create_thread_error;
1776
 
                                }
1777
 
                                xt_unlock_mutex_ns(&db->db_pool_lock);
1778
 
                                goto failed_2;
1779
 
                        }
1780
 
                }
1781
 
 
1782
 
                ignore_create_thread_error:
1783
 
                if (db->db_task_queue_back)
1784
 
                        db->db_task_queue_back->tk_task_list_next = tk;
1785
 
                else
1786
 
                        db->db_task_queue_front = tk;
1787
 
                tk->tk_task_list_next = NULL;
1788
 
                db->db_task_queue_back = tk;
1789
 
                xt_signal_cond(NULL, &db->db_pool_cond);
1790
 
 
1791
 
                xt_unlock_mutex_ns(&db->db_pool_lock);
1792
 
 
1793
 
                if (pool_thread) {
1794
 
                        if (!xt_run_thread_ns(pool_thread, db_thread_pool_run_thread))
1795
 
                                goto failed_2;
1796
 
                }
1797
 
        }
1798
 
 
1799
 
        tk->tk_unlock();
1800
 
        
1801
 
        return OK;
1802
 
 
1803
 
        failed_2:
1804
 
        if (notify_complete)
1805
 
                tk->tk_waiting_threads.pl_remove_pointer(thread);
1806
 
        else
1807
 
                tk->tk_notify_threads.pl_remove_pointer(thread);
1808
 
 
1809
 
        failed_1:
1810
 
        xt_lock_thread(thread);
1811
 
        if (thread->st_tasks_todo.pl_remove_pointer(tk))
1812
 
                tk->tk_release();
1813
 
        xt_unlock_thread(thread);
1814
 
 
1815
 
        failed_0:
1816
 
        tk->tk_unlock();
1817
 
        tk->tk_release();
1818
 
        return FAILED;
1819
 
}
1820
 
 
1821
 
xtPublic void xt_wait_for_async_tasks(XTThreadPtr thread)
1822
 
{
1823
 
        if (!thread->st_tasks_todo.pl_is_empty()) {
1824
 
                xt_lock_thread(thread);
1825
 
                while (!thread->st_tasks_todo.pl_is_empty()) {
1826
 
                        xt_timed_wait_thread(thread, 100);
1827
 
#ifdef DEBUG
1828
 
                        XTTask *tk;
1829
 
 
1830
 
                        for (int i=0; i++; ) {
1831
 
                                if (!(tk = (XTTask *) thread->st_tasks_todo.pl_get_pointer(i)))
1832
 
                                        break;
1833
 
                                ASSERT_NS(tk->tk_running);
1834
 
                        }
1835
 
#endif
1836
 
                }
1837
 
                xt_unlock_thread(thread);
1838
 
        }
1839
 
}
1840
 
 
1841
 
xtPublic xtBool xt_wait_for_async_task_results(XTThreadPtr thread)
1842
 
{
1843
 
        XTTask *tk;
1844
 
        xtBool ok = TRUE;
1845
 
 
1846
 
        /* Wait for the task to finish: */
1847
 
        xt_wait_for_async_tasks(thread);
1848
 
 
1849
 
        /* Collect the results: */
1850
 
        while ((tk = xt_get_task_result(thread))) {
1851
 
                if (!tk->tk_success) {
1852
 
                        XTThreadPtr self = xt_get_self();
1853
 
 
1854
 
                        if (ok) {
1855
 
                                /* Transfer the first error to this thread... */
1856
 
                                if (tk->tk_exception)
1857
 
                                        self->t_exception = *tk->tk_exception;
1858
 
                                else
1859
 
                                        xt_register_errno(XT_REG_CONTEXT, ENOMEM);
1860
 
                                ok = FALSE;
1861
 
                        }
1862
 
                        else {
1863
 
                                /* Log all other errors: */
1864
 
                                if (tk->tk_exception)
1865
 
                                        xt_log_exception(self, tk->tk_exception, XT_LOG_ERROR);
1866
 
                        }
1867
 
                }
1868
 
 
1869
 
                tk->tk_release();
1870
 
        }
1871
 
        
1872
 
        return ok;
1873
 
}
1874
 
 
1875
 
/* After waiting, call this function to collect the results.
1876
 
 * NOTE: Returns a references task!
1877
 
 */
1878
 
xtPublic XTTask *xt_get_task_result(XTThreadPtr thread)
1879
 
{
1880
 
        XTTask *tk;
1881
 
 
1882
 
        xt_lock_thread(thread);
1883
 
        if ((tk = (XTTask *) thread->st_tasks_done.pl_get_pointer(thread->st_tasks_done.pl_size() - 1)))
1884
 
                thread->st_tasks_done.pl_remove_pointer(tk);
1885
 
        xt_unlock_thread(thread);
1886
 
        return tk;
1887
 
}
1888
 
 
1889
 
/*
1890
 
 * This function will wake up a waiting thread so that the 
1891
 
 * task continues without a waiting thread.
1892
 
 */
1893
 
xtPublic void xt_async_task_notify(XTTask *tk)
1894
 
{
1895
 
        if (!tk->tk_notify_threads.pl_is_empty()) {
1896
 
                tk->tk_lock();
1897
 
                db_notify_all(tk);
1898
 
                tk->tk_unlock();
1899
 
        }
1900
 
}
1901
 
 
1902
 
class XTTestTask : public XTTask {
1903
 
        public:
1904
 
        XTTestTask() : XTTask(),
1905
 
                tt_sleep_time(3),
1906
 
                tt_ref_count(0)
1907
 
        { }
1908
 
 
1909
 
        virtual void    tk_delete() { delete this; }
1910
 
        virtual xtBool  tk_task(XTThreadPtr thread);
1911
 
 
1912
 
        int                             tt_sleep_time;
1913
 
        int                             tt_ref_count;
1914
 
 
1915
 
        virtual void    tk_reference();
1916
 
        virtual void    tk_release();
1917
 
};
1918
 
 
1919
 
xtBool XTTestTask::tk_task(XTThreadPtr)
1920
 
{
1921
 
        sleep(tt_sleep_time);
1922
 
        return OK;
1923
 
}
1924
 
 
1925
 
static void db_multi_async_test(XTThreadPtr self, int count)
1926
 
{
1927
 
        XTTestTask *tt;
1928
 
 
1929
 
        if (!self->st_database) {
1930
 
                xt_logf(XT_NT_WARNING, "Open database required to run this test\n");
1931
 
                return;
1932
 
        }
1933
 
 
1934
 
        for (int i=0; i<count; i++) {
1935
 
                if (!(tt = new XTTestTask()))
1936
 
                        xt_throw_errno(XT_CONTEXT, ENOMEM);
1937
 
 
1938
 
                tt->tt_sleep_time = count;
1939
 
 
1940
 
                /* Run the task: */
1941
 
                tt->tk_reference();
1942
 
                if (!xt_run_async_task(tt, TRUE, FALSE, self, self->st_database)) {
1943
 
                        tt->tk_release();
1944
 
                        xt_throw(self);
1945
 
                }
1946
 
                tt->tk_release();
1947
 
        }
1948
 
 
1949
 
        /* Wait for the task to finish: */
1950
 
        xt_wait_for_async_tasks(self);
1951
 
 
1952
 
        /* Collect the results: */
1953
 
        while ((tt = (XTTestTask *) xt_get_task_result(self)))
1954
 
                tt->tk_release();
1955
 
}
1956
 
 
1957
 
void XTTestTask::tk_reference()
1958
 
{
1959
 
        tt_ref_count++;
1960
 
}
1961
 
 
1962
 
void XTTestTask::tk_release()
1963
 
{
1964
 
        tt_ref_count--;
1965
 
        if (!tt_ref_count)
1966
 
                delete this;
1967
 
}
1968
 
 
1969
 
xtPublic void xt_unit_test_async_task(XTThreadPtr self)
1970
 
{
1971
 
        db_multi_async_test(self, 1);
1972
 
        //db_multi_async_test(self, 10, true);
1973
 
        //db_multi_async_test(self, 5);
1974
 
}
1975