~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/TransTest.cc

  • Committer: Stewart Smith
  • Date: 2008-11-21 16:06:07 UTC
  • mto: This revision was merged to the branch mainline in revision 593.
  • Revision ID: stewart@flamingspork.com-20081121160607-n6gdlt013spuo54r
remove mysql_frm_type
and fix engines to return correct value from delete_table when table doesn't exist.
(it should be ENOENT).

Also fix up some tests that manipulated frm files by hand. These tests are no longer valid and will need to be rewritten in the not too distant future.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
 
 *
19
 
 * Barry Leslie
20
 
 *
21
 
 * 2009-06-17
22
 
 *
23
 
 * H&G2JCtL
24
 
 *
25
 
 * PBMS transaction handling test driver.
26
 
 *
27
 
 * This is a test driver for the PBMS transaction log. It uses 2 tables in a database and
28
 
 * inserts transaction records into 1 while writing them to the transaction log. The transaction
29
 
 * log reader thread reads the transactions from the log and writes them to the second table.
30
 
 * After a recovery the 2 tables should be identical.
31
 
 *
32
 
 * Built in crash points can be triggered to test that the recovery works correctly.
33
 
 *
34
 
 */
35
 
 
36
 
#ifdef UNIT_TEST
37
 
 
38
 
#include <stdlib.h>
39
 
#include <stdio.h>
40
 
#include <unistd.h>
41
 
#include <string.h>
42
 
#include <ctype.h>
43
 
#include <inttypes.h>
44
 
 
45
 
#include "cslib/CSConfig.h"
46
 
#include "cslib/CSGlobal.h"
47
 
#include "cslib/CSThread.h"
48
 
#include "cslib/CSStrUtil.h"
49
 
#include "cslib/CSStorage.h"
50
 
 
51
 
#include "trans_cache_ms.h"
52
 
#include "trans_log_ms.h"
53
 
 
54
 
#include "mysql.h"
55
 
 
56
 
#define CREATE_TABLE_BODY "\
57
 
 (\
58
 
  blob_ref INT NOT NULL AUTO_INCREMENT,\
59
 
  tab_id INT NOT NULL,\
60
 
  blob_id BIGINT NOT NULL, \
61
 
  committed BOOLEAN NOT NULL DEFAULT 0, \
62
 
  PRIMARY KEY (blob_ref, tab_id)\
63
 
)\
64
 
ENGINE = INNODB\
65
 
"
66
 
#ifdef LOG_TABLE
67
 
#undef LOG_TABLE
68
 
#endif
69
 
 
70
 
#define LOG_TABLE       "translog"
71
 
#define REF_TABLE       "transref_%d"
72
 
#define MAX_THREADS     20
73
 
 
74
 
#define A_DB_ID 123
75
 
 
76
 
#define TEST_DATABASE_NAME "TransTest"
77
 
static const char *user_name = "root";
78
 
static const char *user_passwd = "";
79
 
static int      port = 3306;
80
 
static const char *host = "localhost";
81
 
static int nap_time = 1000;
82
 
static int max_transaction = 10; // The maximum number of records generated per transaction
83
 
static bool dump_log = false, overflow_crash = false;
84
 
static int crash_site = 0;              // The location to crash at.
85
 
static int num_threads = 1;             // The number of writer threads.
86
 
//static int rate = 1000;                       // The maximum transactions per second to allow.
87
 
static time_t timeout = 60;             // How long to run for before crashing or shutting down.
88
 
static bool revover_only = false;
89
 
static bool recreate = false;
90
 
 
91
 
static uint32_t cache_size = 0, log_size = 0;
92
 
 
93
 
static MSTrans *trans_log;
94
 
 
95
 
static CSThreadList *thread_list;
96
 
 
97
 
static MYSQL *new_connection(bool check_for_db);
98
 
 
99
 
static CSThread *main_thread;
100
 
 
101
 
//------------------------------------------------
102
 
class TransTestThread : public CSDaemon {
103
 
public:
104
 
        TransTestThread(): 
105
 
                CSDaemon(thread_list),
106
 
                count(0),
107
 
                myActivity(0),
108
 
                log(NULL),
109
 
                stopit(false),
110
 
                finished(false),
111
 
                mysql(NULL)
112
 
                {}
113
 
 
114
 
         ~TransTestThread()
115
 
        {
116
 
                if (log)
117
 
                        log->release();
118
 
                        
119
 
                if (mysql)
120
 
                        mysql_close(mysql);
121
 
        }
122
 
         
123
 
        MSTrans *log;
124
 
        MYSQL *mysql;
125
 
        uint32_t count;
126
 
        uint32_t myActivity;
127
 
 
128
 
        bool stopit;
129
 
        bool finished;
130
 
        
131
 
        virtual bool doWork() {return true;}    
132
 
};
133
 
 
134
 
//------------------------------------------------
135
 
class TransTestWriterThread : public TransTestThread {
136
 
public:
137
 
        TransTestWriterThread():TransTestThread() {}
138
 
        
139
 
        uint32_t tab_id;
140
 
        FILE    *myLog;
141
 
        
142
 
        void generate_records();
143
 
        bool doWork() 
144
 
        {
145
 
                generate_records();
146
 
                finished = true;
147
 
                return true;
148
 
        }
149
 
        
150
 
        static TransTestWriterThread *newTransTestWriterThread(uint32_t id)
151
 
        {
152
 
                TransTestWriterThread *tt;
153
 
                enter_();
154
 
                
155
 
                
156
 
                new_(tt, TransTestWriterThread());
157
 
                
158
 
                char name[32];
159
 
                sprintf(name, "write_%d.log", id);
160
 
                if (recreate)
161
 
                        tt->myLog = fopen(name, "w+");
162
 
                else {
163
 
                        tt->myLog = fopen(name, "a+");
164
 
                        fprintf(tt->myLog, "====================================================\n");
165
 
                }
166
 
                
167
 
                tt->tab_id = id ;
168
 
                tt->mysql = new_connection(false);
169
 
                tt->log = trans_log;
170
 
                trans_log->retain();
171
 
                
172
 
                return_(tt); 
173
 
        }
174
 
        
175
 
        
176
 
};
177
 
 
178
 
//------------------------------------------------
179
 
class TransTestReaderThread : public TransTestThread {
180
 
public:
181
 
        TransTestReaderThread():TransTestThread(){}
182
 
        
183
 
        bool recovering;
184
 
        void processTransactionLog();
185
 
        bool doWork() 
186
 
        {
187
 
                processTransactionLog();
188
 
                return true;
189
 
        }
190
 
        
191
 
        static TransTestReaderThread *newTransTestReaderThread(MSTrans *log)
192
 
        {
193
 
                TransTestReaderThread *tt;
194
 
                enter_();
195
 
                
196
 
                new_(tt, TransTestReaderThread());
197
 
                tt->mysql = new_connection(false);
198
 
                tt->log = log;
199
 
                tt->log->retain();
200
 
                
201
 
                tt->log->txn_SetReader(tt); // The reader daemon is passed in unreferenced.
202
 
                tt->recovering = false;
203
 
                return_(tt); 
204
 
        }
205
 
        
206
 
        bool rec_found(uint64_t id, uint32_t tab_id) 
207
 
        {
208
 
                char stmt[100];
209
 
                MYSQL_RES *results = NULL;            
210
 
                bool found;
211
 
                
212
 
                sprintf(stmt, "SELECT blob_ref FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %"PRIu32"", id, tab_id); 
213
 
                if (mysql_query(mysql, stmt)) {
214
 
                        printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
215
 
                        printf("%s\n", stmt);
216
 
                        exit(1);
217
 
                }
218
 
                
219
 
                
220
 
                results = mysql_store_result(mysql);
221
 
                if (!results){
222
 
                        printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
223
 
                        exit(1);
224
 
                }
225
 
 
226
 
                found = (mysql_num_rows(results) == 1);         
227
 
                mysql_free_result(results);
228
 
                
229
 
                return found;
230
 
                
231
 
        }
232
 
        
233
 
        
234
 
};
235
 
 
236
 
TransTestReaderThread *TransReader;
237
 
//------------------------------------------------
238
 
static void report_mysql_error(MYSQL *mysql, int line, const char *msg)
239
 
{
240
 
        printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), line);
241
 
        if (msg)
242
 
                printf("%s\n", msg);
243
 
        exit(1);
244
 
}
245
 
 
246
 
 
247
 
//------------------------------------------------
248
 
static MYSQL *new_connection(bool check_for_db)
249
 
{
250
 
        MYSQL *mysql;
251
 
 
252
 
        mysql = mysql_init(NULL);
253
 
        if (!mysql) {
254
 
                printf( "mysql_init() failed.\n");
255
 
                exit(1);
256
 
        }
257
 
 
258
 
        if (mysql_real_connect(mysql, host, user_name, user_passwd, NULL, port, NULL, 0) == NULL)
259
 
                report_mysql_error(mysql, __LINE__, "mysql_real_connect()");
260
 
 
261
 
        if (check_for_db) {
262
 
                MYSQL_RES *results = NULL;            
263
 
                
264
 
                if (mysql_query(mysql, "show databases like \"" TEST_DATABASE_NAME "\""))
265
 
                        report_mysql_error(mysql, __LINE__, "show databases like \"" TEST_DATABASE_NAME "\"");
266
 
                
267
 
                results = mysql_store_result(mysql);
268
 
                if (!results)
269
 
                        report_mysql_error(mysql, __LINE__, "mysql_store_result()");
270
 
 
271
 
 
272
 
                if (mysql_num_rows(results) != 1) {
273
 
                        if (mysql_query(mysql, "create database " TEST_DATABASE_NAME ))
274
 
                                report_mysql_error(mysql, __LINE__, "create database " TEST_DATABASE_NAME );
275
 
                }
276
 
                mysql_free_result(results);
277
 
        }
278
 
        
279
 
        if (mysql_query(mysql, "use " TEST_DATABASE_NAME ))
280
 
                report_mysql_error(mysql, __LINE__, "use " TEST_DATABASE_NAME );
281
 
 
282
 
        return mysql;
283
 
}
284
 
 
285
 
//------------------------------------------------
286
 
static void init_database(MYSQL *mysql, int cnt)
287
 
{
288
 
        char stmt[1024];
289
 
        
290
 
        unlink("ms-trans-log.dat");
291
 
        mysql_query(mysql, "drop table if exists " LOG_TABLE  ";");
292
 
        
293
 
        if (mysql_query(mysql, "create table " LOG_TABLE  CREATE_TABLE_BODY ";")){
294
 
                printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
295
 
                exit(1);
296
 
        }
297
 
 
298
 
        while (cnt) {
299
 
                sprintf(stmt, "drop table if exists " REF_TABLE  ";", cnt);
300
 
                mysql_query(mysql, stmt);
301
 
                sprintf(stmt, "create table " REF_TABLE  CREATE_TABLE_BODY ";", cnt);
302
 
                if (mysql_query(mysql, stmt)){
303
 
                        printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
304
 
                        exit(1);
305
 
                }
306
 
                cnt--;
307
 
        }
308
 
}
309
 
 
310
 
 
311
 
//------------------------------------------------
312
 
static void display_help(const char *app)
313
 
{
314
 
        printf("\nUsage:\n");
315
 
        printf("%s -help | -r  [-t<num_threads>] | -d | [-n] [-sc <cache_size>] [-sl <log_size>] [-c <crash_site>]  [-t<num_threads>] [<timeout>]\n\n", app);
316
 
        
317
 
        printf("-r: Test recovery after a crash or shutdown.\n");
318
 
        printf("-d: Dump the transaction log.\n");
319
 
        printf("-n: Recreate the tables and recovery log.\n");
320
 
        printf("-c <crash_site>: Crash at this location rather than shutting down. Max = %d\n", MAX_CRASH_POINT+1);
321
 
        printf("-t<num_threads>: The number of writer threads to use, default is %d.\n", num_threads);
322
 
        //printf("-r<rate>: The number af records to be inserted per second, default is %d.\n", rate);
323
 
        printf("<timeout>: The number seconds the test should run before shuttingdown or crashing, default is %d.\n\n", timeout);
324
 
        exit(1);
325
 
}
326
 
 
327
 
//---------------------------------     
328
 
static void process_args(int argc, const char * argv[])
329
 
{
330
 
        if (argc < 2)
331
 
                return;
332
 
                
333
 
        for (int i = 1; i < argc; ) {
334
 
                if ( argv[i][0] != '-') { // Must be timeout
335
 
                        timeout = atoi(argv[i]);
336
 
                        i++;
337
 
                        if ((i != argc) || !timeout)
338
 
                                display_help(argv[0]);
339
 
                } else {
340
 
                        switch (argv[i][1]) {
341
 
                                case 'h':
342
 
                                        display_help(argv[0]);
343
 
                                        break;
344
 
                                        
345
 
                                case 'r':
346
 
                                        if (argc > 4 || argv[i][2])
347
 
                                                display_help(argv[0]);
348
 
                                        revover_only = true;
349
 
                                        i++;
350
 
                                        break;
351
 
                                        
352
 
                                case 'd':
353
 
                                        if (argc != 2 || argv[i][2])
354
 
                                                display_help(argv[0]);
355
 
                                        dump_log = true;
356
 
                                        i++;
357
 
                                        break;
358
 
                                        
359
 
                                case 'n':
360
 
                                        if (argv[i][2])
361
 
                                                display_help(argv[0]);
362
 
                                        recreate = true;
363
 
                                        i++;
364
 
                                        break;
365
 
                                        
366
 
                                case 'c':
367
 
                                        if (argv[i][2])
368
 
                                                display_help(argv[0]);
369
 
                                        i++;
370
 
                                        crash_site = atoi(argv[i]);
371
 
                                        if (crash_site == (MAX_CRASH_POINT + 1))
372
 
                                                overflow_crash = true;
373
 
                                        else if ((!crash_site) || (crash_site >  MAX_CRASH_POINT))
374
 
                                                display_help(argv[0]);
375
 
                                        i++;
376
 
                                        break;
377
 
                                        
378
 
                                case 's': {
379
 
                                                uint32_t size;
380
 
                                                
381
 
                                                size = atol(argv[i+1]);
382
 
                                                if (!size)
383
 
                                                        display_help(argv[0]);
384
 
                                                        
385
 
                                                if (argv[i][2] == 'c') 
386
 
                                                        cache_size = size;
387
 
                                                else if (argv[i][2] == 'l')
388
 
                                                        log_size = size;
389
 
                                                else 
390
 
                                                        display_help(argv[0]);
391
 
                                                
392
 
                                                i+=2;
393
 
                                        }
394
 
                                        break;
395
 
                                        
396
 
                                case 't':
397
 
                                        if (argv[i][2])
398
 
                                                display_help(argv[0]);
399
 
                                        i++;
400
 
                                        num_threads = atoi(argv[i]);
401
 
                                        if (!num_threads)
402
 
                                                display_help(argv[0]);
403
 
                                        i++;
404
 
                                        break;
405
 
/*                                      
406
 
                                case 'r':
407
 
                                        i++;
408
 
                                        rate = atoi(argv[i]);
409
 
                                        if (!rate)
410
 
                                                display_help(argv[0]);
411
 
                                        i++;
412
 
                                        break;
413
 
*/
414
 
                                default:
415
 
                                        display_help(argv[0]);
416
 
                        }
417
 
                        
418
 
                }
419
 
        }
420
 
}
421
 
 
422
 
//---------------------------------     
423
 
static void init_env()
424
 
{
425
 
        cs_init_memory();
426
 
        CSThread::startUp();
427
 
        if (!(main_thread = CSThread::newCSThread())) {
428
 
                CSException::logOSError(CS_CONTEXT, ENOMEM);
429
 
                exit(1);
430
 
        }
431
 
        
432
 
        CSThread::setSelf(main_thread);
433
 
        
434
 
        enter_();
435
 
        try_(a) {
436
 
                trans_log = MSTrans::txn_NewMSTrans("./ms-trans-log.dat", /*dump_log*/ true);
437
 
                new_(thread_list, CSThreadList()); 
438
 
        }
439
 
        catch_(a) {
440
 
                self->logException();
441
 
                CSThread::shutDown();
442
 
                exit(1);
443
 
        }
444
 
        cont_(a);
445
 
        
446
 
}
447
 
//---------------------------------     
448
 
static void deinit_env()
449
 
{
450
 
        if (thread_list) {
451
 
                thread_list->release();
452
 
                thread_list = NULL;
453
 
        }
454
 
        
455
 
        if (trans_log) {
456
 
                trans_log->release();
457
 
                trans_log = NULL;
458
 
        }
459
 
        
460
 
        if (main_thread) {
461
 
                main_thread->release();
462
 
                main_thread = NULL;
463
 
        }
464
 
        
465
 
        CSThread::shutDown();
466
 
        cs_exit_memory();
467
 
}
468
 
//---------------------------------     
469
 
static bool verify_database(MYSQL *mysql)
470
 
{
471
 
        MYSQL_RES **r_results, *l_results = NULL;            
472
 
        MYSQL_ROW r_record, l_record;
473
 
        bool ok = false;
474
 
        int i, log_row_cnt, ref_row_cnt = 0, tab_id;
475
 
        char stmt[1024];
476
 
        
477
 
        r_results = (MYSQL_RES **) malloc(num_threads * sizeof(MYSQL_RES *));
478
 
        
479
 
        if (mysql_query(mysql, "select * from "LOG_TABLE" where committed = 0 order by blob_ref")) 
480
 
                report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
481
 
                                                
482
 
        l_results = mysql_store_result(mysql);
483
 
        if (!l_results)
484
 
                report_mysql_error(mysql, __LINE__, "mysql_store_result()");
485
 
 
486
 
        log_row_cnt = mysql_num_rows(l_results);
487
 
        mysql_free_result(l_results);
488
 
        if (log_row_cnt)
489
 
                printf("Uncommitted references: %d\n", log_row_cnt);
490
 
 
491
 
        //---------
492
 
        for (i =0; i < num_threads; i++) {
493
 
                sprintf(stmt, "select * from "REF_TABLE" order by blob_ref", i+1);
494
 
                if (mysql_query(mysql, stmt)) 
495
 
                        report_mysql_error(mysql, __LINE__, stmt);
496
 
                                                        
497
 
                r_results[i] = mysql_store_result(mysql);
498
 
                if (!r_results)
499
 
                        report_mysql_error(mysql, __LINE__, "mysql_store_result()");
500
 
                        
501
 
                ref_row_cnt += mysql_num_rows(r_results[i]);
502
 
        }       
503
 
        //---------
504
 
        if (mysql_query(mysql, "select * from "LOG_TABLE" order by blob_ref")) 
505
 
                report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
506
 
                                                
507
 
        l_results = mysql_store_result(mysql);
508
 
        if (!l_results)
509
 
                report_mysql_error(mysql, __LINE__, "mysql_store_result()");
510
 
 
511
 
        log_row_cnt = mysql_num_rows(l_results);
512
 
        
513
 
        if (log_row_cnt != ref_row_cnt) {
514
 
                if (ref_row_cnt > log_row_cnt) {
515
 
                        printf("verify_database() Failed: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt,  ref_row_cnt);
516
 
                        goto done;
517
 
                }
518
 
                
519
 
                printf("verify_database() Warnning: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt,  ref_row_cnt);         
520
 
                printf("Possible unreferenced BLOBs\n");
521
 
        }
522
 
        
523
 
        if (log_row_cnt == ref_row_cnt) {
524
 
                for ( i = 0; i < log_row_cnt; i++) {
525
 
                        l_record = mysql_fetch_row(l_results);
526
 
                        tab_id = atol(l_getUpdateRecord());
527
 
                        r_record = mysql_fetch_row(r_results[tab_id-1]);                
528
 
                        if ((atol(l_getInsertRecord()) != atol(r_getInsertRecord())) ||
529
 
                                (atol(l_getUpdateRecord()) != atol(r_getUpdateRecord())) ||
530
 
                                (atol(l_record[2]) != atol(r_record[2]))) {
531
 
                                
532
 
                                printf("verify_database() Failed: in row %d, tab_id %d\n", i+1, tab_id);
533
 
                                printf("field 1:  %d =? %d\n", atol(l_getInsertRecord()), atol(r_getInsertRecord()));
534
 
                                printf("field 2:  %d =? %d\n", atol(l_getUpdateRecord()), atol(r_getUpdateRecord()));
535
 
                                printf("field 3:  %d =? %d\n", atol(l_record[2]), atol(r_record[2]));
536
 
                                goto done;
537
 
                        }
538
 
                                
539
 
                }
540
 
        } else { // The important thing is that there are no BLOBs in the ref tabels that are not in the log table.
541
 
 
542
 
                for (i =0; i < num_threads; i++) {
543
 
                        mysql_free_result(r_results[i]);
544
 
                        
545
 
                        sprintf(stmt, "select * from "REF_TABLE" where  blob_ref not in (select blob_ref from TransTest.translog where tab_id = %d)", i+1, i+1);
546
 
                        if (mysql_query(mysql, stmt)) 
547
 
                                report_mysql_error(mysql, __LINE__, stmt);
548
 
                                                                
549
 
                        r_results[i] = mysql_store_result(mysql);
550
 
                        if (!r_results)
551
 
                                report_mysql_error(mysql, __LINE__, "mysql_store_result()");
552
 
                                
553
 
                        if (mysql_num_rows(r_results[i])) {
554
 
                                printf("verify_database() Failed, Missing BLOBs: %s\n", stmt);
555
 
                                goto done;
556
 
                        }
557
 
                }       
558
 
        }
559
 
        
560
 
        printf("verify_database() OK.\n");
561
 
        ok = true;
562
 
        
563
 
        done:
564
 
        
565
 
        for (i =0; i < num_threads; i++) {
566
 
                mysql_free_result(r_results[i]);
567
 
        }
568
 
        free(r_results);
569
 
        
570
 
        mysql_free_result(l_results);
571
 
        
572
 
#ifdef DEBUG    
573
 
        if (!ok) {
574
 
                trans_log->txn_DumpLog("trace.log");
575
 
        }
576
 
#endif  
577
 
        return ok;
578
 
}
579
 
 
580
 
//------------------------------------------------
581
 
void TransTestReaderThread::processTransactionLog()
582
 
{
583
 
        MSTransRec rec = {0};
584
 
        MS_TxnState state;
585
 
        char stmt[1024];
586
 
        uint32_t last_tid = 0;
587
 
        enter_();
588
 
        
589
 
        // Read in transactions from the log and update
590
 
        // the database table based on them.
591
 
        
592
 
        try_(a) {
593
 
                while (!myMustQuit && !stopit) {
594
 
                        // This will sleep while waiting for the next 
595
 
                        // completed transaction.
596
 
                        log->txn_GetNextTransaction(&rec, &state); 
597
 
                        if (myMustQuit)
598
 
                                break;
599
 
 
600
 
                        myActivity++;
601
 
#ifdef CHECK_TIDS
602
 
                        if (num_threads == 1) {
603
 
                                ASSERT( ((last_tid + 1) == rec.tr_id) || (last_tid  == rec.tr_id) || !last_tid);
604
 
                                last_tid = rec.tr_id;
605
 
                        }
606
 
#endif                  
607
 
                        if (!recovering) 
608
 
                                count++;
609
 
                        
610
 
                        switch (TRANS_TYPE(rec.tr_type)) {
611
 
                                case MS_ReferenceTxn:
612
 
                                case MS_DereferenceTxn:
613
 
                                case MS_RollBackTxn:
614
 
                                case MS_CommitTxn:
615
 
                                case MS_RecoveredTxn:
616
 
                                break;
617
 
                                default:
618
 
                                        printf("Unexpected transaction type: %d\n", rec.tr_type);
619
 
                                        exit(1);                                                        
620
 
                        }
621
 
                        
622
 
                        if (state == MS_Committed){
623
 
                                // Dereferences are applied when the transaction is commited.
624
 
                                // References are applied imediatly and removed if the transaction is rolled back.
625
 
                                if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn) {
626
 
                                        sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
627
 
                                        if (mysql_query(mysql, stmt))  
628
 
                                                report_mysql_error(mysql, __LINE__, stmt);
629
 
                                } else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
630
 
                                        sprintf(stmt, "UPDATE "LOG_TABLE" SET committed = 1 WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
631
 
                                        if (mysql_query(mysql, stmt))  
632
 
                                                report_mysql_error(mysql, __LINE__, stmt);
633
 
                                }
634
 
                        } else if (state == MS_RolledBack) { 
635
 
                                //printf("ROLLBACK!\n");
636
 
                                if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
637
 
                                        sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
638
 
                                        if (mysql_query(mysql, stmt))  
639
 
                                                report_mysql_error(mysql, __LINE__, stmt);
640
 
                                }
641
 
                        } else if (state == MS_Recovered) { 
642
 
                                printf("Recovered transaction being ignored:\n");
643
 
                                printf("blob_ref = %"PRIu64", tab_id = %d, blob_id = %"PRIu64"\n\n", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
644
 
                        } else {
645
 
                                printf("Unexpected transaction state: %d\n", state);
646
 
                                exit(1);                                                        
647
 
                        }
648
 
                        
649
 
                        
650
 
                }
651
 
        }
652
 
        catch_(a) {
653
 
                self->logException();
654
 
                printf("\n\n!!!!!!!! THE TRANSACTION LOG READER DIED! !!!!!!!!!!!\n\n");
655
 
                if (!myMustQuit && !stopit)
656
 
                        exit(1);
657
 
        }
658
 
        cont_(a);
659
 
        printf("The transaction log reader shutting down.\n");
660
 
        exit_();
661
 
}
662
 
 
663
 
//------------------------------------------------
664
 
void TransTestWriterThread::generate_records()
665
 
{
666
 
 
667
 
        MS_Txn  txn_type;               
668
 
        uint64_t        blob_id;        
669
 
        uint64_t        blob_ref_id;    
670
 
        int tsize, i;
671
 
        bool do_delete;
672
 
                
673
 
        char stmt[1024];
674
 
        enter_();
675
 
 
676
 
        try_(a) {
677
 
                while (!myMustQuit && !stopit) {
678
 
                
679
 
                        myActivity++;
680
 
                        usleep(nap_time); // Give up a bit of time
681
 
                        if (myMustQuit || stopit)
682
 
                                break;
683
 
                                
684
 
                        tsize = rand() % max_transaction;
685
 
                        
686
 
                        if (mysql_autocommit(mysql, 0))
687
 
                                report_mysql_error(mysql, __LINE__, "mysql_autocommit()");
688
 
                                
689
 
                        i = 0;
690
 
                        do {
691
 
                                do_delete = ((rand() %2) == 0);
692
 
                                
693
 
                                // decide if this is an insert or delete
694
 
                                if (do_delete) {
695
 
                                        MYSQL_RES *results = NULL;            
696
 
                                        MYSQL_ROW record;
697
 
                                        int cnt;
698
 
                                        
699
 
                                        // If we are deleting then randomly select a record to delete
700
 
                                        // and delete it. 
701
 
                                        
702
 
                                        txn_type = MS_DereferenceTxn;
703
 
 
704
 
                                        sprintf(stmt, "select * from "REF_TABLE, tab_id); 
705
 
                                        if (mysql_query(mysql, stmt)) 
706
 
                                                report_mysql_error(mysql, __LINE__, stmt);
707
 
                                                
708
 
                                        results = mysql_store_result(mysql);
709
 
                                        if (!results)
710
 
                                                report_mysql_error(mysql, __LINE__, "mysql_store_result()");
711
 
                                                
712
 
                                        cnt = mysql_num_rows(results);
713
 
                                        if (!cnt)
714
 
                                                do_delete = false; // There is nothing to delete
715
 
                                        else {
716
 
                                                mysql_data_seek(results, rand()%cnt);
717
 
                                                record = mysql_fetch_row(results);
718
 
                                                        
719
 
                                                blob_ref_id = atol(getInsertRecord());
720
 
                                                blob_id = atol(record[2]);
721
 
                                                
722
 
                                                sprintf(stmt, "DELETE FROM "REF_TABLE" WHERE blob_ref = %"PRIu64" AND blob_id = %"PRIu64"", tab_id, blob_ref_id, blob_id); 
723
 
                                                if (mysql_query(mysql, stmt))  
724
 
                                                        report_mysql_error(mysql, __LINE__, stmt);
725
 
                                                        
726
 
                                                if (mysql_affected_rows(mysql) == 0)
727
 
                                                        do_delete = false; // Another thread must have deleted the row first.
728
 
                                                else
729
 
                                                        fprintf(myLog, "DELETE %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id); 
730
 
                                        }
731
 
                                        
732
 
                                        mysql_free_result(results);
733
 
                                } 
734
 
                                
735
 
                                if (!do_delete) {
736
 
                                        blob_id = self->myTID; // Assign the tid as the blob id to help with debugging.
737
 
                                        txn_type = MS_ReferenceTxn;
738
 
                                        
739
 
                                        sprintf(stmt, "INSERT INTO "REF_TABLE" VALUES( NULL, %d, %"PRIu64", 0)", tab_id, tab_id, blob_id); 
740
 
                                        if (mysql_query(mysql, stmt)) 
741
 
                                                report_mysql_error(mysql, __LINE__, stmt);
742
 
                                                
743
 
                                        blob_ref_id = mysql_insert_id(mysql);
744
 
                                        if (!blob_ref_id)
745
 
                                                report_mysql_error(mysql, __LINE__, "mysql_insert_id() returned 0");
746
 
                                        
747
 
                                        fprintf(myLog, "INSERT %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id);   
748
 
                                        // Apply the blob reference now. This will be undone if the transaction is rolled back.
749
 
                                        sprintf(stmt, "INSERT INTO "LOG_TABLE" VALUES(%"PRIu64", %d, %"PRIu64", 0)", blob_ref_id, tab_id, blob_id); 
750
 
                                        if (mysql_query(mysql, stmt)) 
751
 
                                                report_mysql_error(mysql, __LINE__, stmt);
752
 
                                }
753
 
 
754
 
                                i++;
755
 
                                count++;
756
 
                                if (i >= tsize) { //Commit the database transaction before the log transaction.
757
 
                                        bool rollback;
758
 
                                        
759
 
                                        rollback = ((tsize > 0) && ((rand() % 1000) == 0));
760
 
                                        if (rollback) {
761
 
                                                printf("Rollback\n");
762
 
                                                if (mysql_rollback(mysql)) // commit the staement to the database,
763
 
                                                        report_mysql_error(mysql, __LINE__, "mysql_rollback()");        
764
 
                                                fprintf(myLog, "Rollback %"PRIu32"\n", self->myTID);    
765
 
                                                log->txn_LogTransaction(MS_RollBackTxn);
766
 
                                        } else {
767
 
                                                if (mysql_commit(mysql)) // commit the staement to the database,
768
 
                                                        report_mysql_error(mysql, __LINE__, "mysql_commit()");  
769
 
                                                fprintf(myLog, "Commit %"PRIu32"\n", self->myTID);      
770
 
                                                log->txn_LogTransaction(txn_type, true, A_DB_ID, tab_id, blob_id, blob_ref_id);
771
 
                                        }
772
 
                                } else
773
 
                                        log->txn_LogTransaction(txn_type, false, A_DB_ID, tab_id, blob_id, blob_ref_id);
774
 
                                                                
775
 
                        } while ( i < tsize);
776
 
                                                
777
 
                }
778
 
        }
779
 
        
780
 
        catch_(a) {
781
 
                self->logException();
782
 
                printf("\n\nA writer thread for table %d died! \n\n", tab_id);
783
 
                if (i == tsize) {
784
 
                        printf(" It is possible that the last %d operations on table %d were committed to the database but not to the log.\n", tsize, tab_id);
785
 
                }
786
 
                if (!myMustQuit && !stopit)
787
 
                        exit(1);
788
 
        }
789
 
        cont_(a);
790
 
        printf("Writer thread for table %d is shutting down.\n", tab_id);
791
 
        exit_();
792
 
}
793
 
 
794
 
// SELECT * FROM TransTest.translog where  blob_ref not in (select blob_ref from TransTest.transref)
795
 
// SELECT * FROM TransTest.transref_1 where  blob_ref not in (select blob_ref from TransTest.translog where tab_id = 1)
796
 
// SELECT * FROM TransTest.translog where  tab_id = 1 AND blob_ref not in (select blob_ref from TransTest.transref_1)
797
 
// select count(*) from TransTest.translog where committed = 1
798
 
//---------------------------------     
799
 
int main (int argc, const char * argv[]) 
800
 
{
801
 
        MYSQL *mysql;
802
 
        TransTestWriterThread **writer = NULL;
803
 
        int rtc = 1;
804
 
        
805
 
        process_args(argc, argv);
806
 
        
807
 
        mysql = new_connection(true);
808
 
        
809
 
        if (recreate)
810
 
                init_database(mysql, num_threads);
811
 
                
812
 
        init_env();
813
 
        enter_();
814
 
        
815
 
        if (dump_log) {
816
 
                printf("LOG dumped\n");
817
 
                exit(1);
818
 
        }
819
 
        
820
 
        TransReader = TransTestReaderThread::newTransTestReaderThread(trans_log);
821
 
        push_(TransReader);
822
 
        TransReader->recovering = true;
823
 
        TransReader->start();
824
 
        
825
 
        // wait until the recovery is complete.
826
 
        while (trans_log->txn_GetNumRecords())
827
 
                usleep(100);
828
 
                
829
 
        TransReader->recovering = false;
830
 
        
831
 
        if (log_size)
832
 
                trans_log->txn_SetLogSize(log_size);
833
 
                
834
 
        if (cache_size)
835
 
                trans_log->txn_SetCacheSize(cache_size);
836
 
                
837
 
        if (revover_only) {
838
 
                TransReader->stopit = true;
839
 
                if (verify_database(mysql))
840
 
                        rtc = 0;
841
 
                goto done;
842
 
        }
843
 
        
844
 
        try_(a) {
845
 
                writer = (TransTestWriterThread **) cs_malloc(num_threads * sizeof(TransTestWriterThread *));
846
 
                for (int i = 0; i < num_threads; i++) {
847
 
                        TransTestWriterThread *wt = TransTestWriterThread::newTransTestWriterThread(i+1);
848
 
                        wt->start();
849
 
                        writer[i] = wt;
850
 
                }
851
 
        
852
 
                printf("Timeout: %d seconds\n", timeout); 
853
 
                timeout += time(NULL);
854
 
                int header = 0;
855
 
                while (timeout > time(NULL)) {
856
 
                        MSTransStatsRec stats;
857
 
                        self->sleep(1000);
858
 
                        trans_log->txn_GetStats(&stats);
859
 
                        
860
 
                        
861
 
                        if (!(header%20)) {
862
 
                                for (int i = 0; i < num_threads; i++) {                         
863
 
                                        if (writer[i]->myActivity == 0) {
864
 
                                                printf("Writer thread %d HUNG!!!\n", i);
865
 
                                        }
866
 
                                        writer[i]->myActivity = 0;
867
 
                                }
868
 
                                
869
 
                                if (TransReader->myActivity == 0) {
870
 
                                        printf("Reader thread HUNG!!!\n");
871
 
                                }
872
 
                                TransReader->myActivity = 0;
873
 
                                        
874
 
                                printf("%s | %s | %s | %s | %s | %s | %s | %s\n", "LogSize", "Full", "MaxSize", "Overflows", "Overflowing", "CacheSize", "Cache Used", "Cache Hit");
875
 
                        }
876
 
                        header++;
877
 
                        //printf("Writes: %d \t\t Reads: %d \t%d \t start: %"PRIu64"\t\t eol:%"PRIu64"\n", count, TransReader->count, count - TransReader->count, trans_log->txn_Start, trans_log->txn_EOL);
878
 
                        printf("%7llu | %3d%% | %7llu | %9d | %11s | %9d | %9d%% | %9d%%\n",// | \t\t\t%"PRIu64" \t%"PRIu64"\n", 
879
 
                                stats.ts_LogSize,
880
 
                                stats.ts_PercentFull,
881
 
                                stats.ts_MaxSize,
882
 
                                stats.ts_OverflowCount,
883
 
                                (stats.ts_IsOverflowing)?"Over Flow": "   ---   ",
884
 
                                stats.ts_TransCacheSize,
885
 
                                stats.ts_PercentTransCacheUsed,
886
 
                                stats.ts_PercentCacheHit//, trans_log->txn_Start, trans_log->txn_EOL
887
 
                                );
888
 
                                
889
 
                                if (stats.ts_IsOverflowing && overflow_crash) {
890
 
                                        printf("Simulating crash while in overflow\n");
891
 
                                        exit(1);
892
 
                                }
893
 
                }
894
 
 
895
 
#ifdef CRASH_TEST               
896
 
                if (crash_site) {
897
 
                        printf("Crashing at crash site %d\n", crash_site);
898
 
                        trans_test_crash_point = crash_site;
899
 
                        // set the crash site and wait to die.
900
 
                        while(1)
901
 
                                self->sleep(1000);
902
 
                }
903
 
#endif
904
 
                
905
 
                printf("Shutting down the writer threads:\n");
906
 
                for (int i = 0; i < num_threads; i++) {
907
 
                        writer[i]->stopit = true;
908
 
                }
909
 
                
910
 
                TransReader->stopit = true;
911
 
                // Give the writers a chance to shutdown by themselves.
912
 
                int cnt = 100;
913
 
                while (cnt) {
914
 
                        int i;
915
 
                        for (i = 0; i < num_threads && writer[i]->finished; i++);
916
 
                        if (i == num_threads && TransReader->finished)
917
 
                                break;
918
 
                        self->sleep(10);        
919
 
                        cnt--;                  
920
 
                }
921
 
                
922
 
                for (int i = 0; i < num_threads; i++) {
923
 
                        writer[i]->stop();
924
 
                }
925
 
                
926
 
        }
927
 
        rtc = 0;
928
 
        catch_(a) {
929
 
                printf("Main thread abort.\n");
930
 
                self->logException();
931
 
        }
932
 
        cont_(a);
933
 
        if (writer) {
934
 
                for (int i = 0; i < num_threads; i++) {
935
 
                        writer[i]->stop();
936
 
                        writer[i]->release();
937
 
                }
938
 
                cs_free(writer);
939
 
        }
940
 
                
941
 
done:
942
 
        TransReader->stop();
943
 
        release_(TransReader);
944
 
        
945
 
        outer_();
946
 
        
947
 
        thread_list->stopAllThreads();
948
 
        deinit_env();
949
 
        mysql_close(mysql);
950
 
        exit(rtc);
951
 
}
952
 
 
953
 
#endif // UNIT_TEST