~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/TransTest.cc

  • Committer: Mark Atwood
  • Date: 2011-12-20 02:32:53 UTC
  • mfrom: (2469.1.1 drizzle-build)
  • Revision ID: me@mark.atwood.name-20111220023253-bvu0kr14kwsdvz7g
mergeĀ lp:~brianaker/drizzle/deprecate-pbms

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Barry Leslie
20
 
 *
21
 
 * 2009-06-17
22
 
 *
23
 
 * H&G2JCtL
24
 
 *
25
 
 * PBMS transaction handling test driver.
26
 
 *
27
 
 * This is a test driver for the PBMS transaction log. It uses 2 tables in a database and
28
 
 * inserts transaction records into 1 while writing them to the transaction log. The transaction
29
 
 * log reader thread reads the transactions from the log and writes them to the second table.
30
 
 * After a recovery the 2 tables should be identical.
31
 
 *
32
 
 * Built in crash points can be triggered to test that the recovery works correctly.
33
 
 *
34
 
 */
35
 
 
36
 
#ifdef UNIT_TEST
37
 
 
38
 
#include <stdlib.h>
39
 
#include <stdio.h>
40
 
#include <unistd.h>
41
 
#include <string.h>
42
 
#include <ctype.h>
43
 
#include <inttypes.h>
44
 
 
45
 
#include "cslib/CSConfig.h"
46
 
#include "cslib/CSGlobal.h"
47
 
#include "cslib/CSThread.h"
48
 
#include "cslib/CSStrUtil.h"
49
 
#include "cslib/CSStorage.h"
50
 
 
51
 
#include "trans_cache_ms.h"
52
 
#include "trans_log_ms.h"
53
 
 
54
 
#include "mysql.h"
55
 
 
56
 
#define CREATE_TABLE_BODY "\
57
 
 (\
58
 
  blob_ref INT NOT NULL AUTO_INCREMENT,\
59
 
  tab_id INT NOT NULL,\
60
 
  blob_id BIGINT NOT NULL, \
61
 
  committed BOOLEAN NOT NULL DEFAULT 0, \
62
 
  PRIMARY KEY (blob_ref, tab_id)\
63
 
)\
64
 
ENGINE = INNODB\
65
 
"
66
 
#ifdef LOG_TABLE
67
 
#undef LOG_TABLE
68
 
#endif
69
 
 
70
 
#define LOG_TABLE       "translog"
71
 
#define REF_TABLE       "transref_%d"
72
 
#define MAX_THREADS     20
73
 
 
74
 
#define A_DB_ID 123
75
 
 
76
 
#define TEST_DATABASE_NAME "TransTest"
77
 
static const char *user_name = "root";
78
 
static const char *user_passwd = "";
79
 
static int      port = 3306;
80
 
static const char *host = "localhost";
81
 
static int nap_time = 1000;
82
 
static int max_transaction = 10; // The maximum number of records generated per transaction
83
 
static bool dump_log = false, overflow_crash = false;
84
 
static int crash_site = 0;              // The location to crash at.
85
 
static int num_threads = 1;             // The number of writer threads.
86
 
//static int rate = 1000;                       // The maximum transactions per second to allow.
87
 
static time_t timeout = 60;             // How long to run for before crashing or shutting down.
88
 
static bool revover_only = false;
89
 
static bool recreate = false;
90
 
 
91
 
static uint32_t cache_size = 0, log_size = 0;
92
 
 
93
 
static MSTrans *trans_log;
94
 
 
95
 
static CSThreadList *thread_list;
96
 
 
97
 
static MYSQL *new_connection(bool check_for_db);
98
 
 
99
 
static CSThread *main_thread;
100
 
 
101
 
//------------------------------------------------
102
 
class TransTestThread : public CSDaemon {
103
 
public:
104
 
        TransTestThread(): 
105
 
                CSDaemon(thread_list),
106
 
                count(0),
107
 
                myActivity(0),
108
 
                log(NULL),
109
 
                stopit(false),
110
 
                finished(false),
111
 
                mysql(NULL)
112
 
                {}
113
 
 
114
 
         ~TransTestThread()
115
 
        {
116
 
                if (log)
117
 
                        log->release();
118
 
                        
119
 
                if (mysql)
120
 
                        mysql_close(mysql);
121
 
        }
122
 
         
123
 
        MSTrans *log;
124
 
        MYSQL *mysql;
125
 
        uint32_t count;
126
 
        uint32_t myActivity;
127
 
 
128
 
        bool stopit;
129
 
        bool finished;
130
 
        
131
 
        virtual bool doWork() {return true;}    
132
 
};
133
 
 
134
 
//------------------------------------------------
135
 
class TransTestWriterThread : public TransTestThread {
136
 
public:
137
 
        TransTestWriterThread():TransTestThread() {}
138
 
        
139
 
        uint32_t tab_id;
140
 
        FILE    *myLog;
141
 
        
142
 
        void generate_records();
143
 
        bool doWork() 
144
 
        {
145
 
                generate_records();
146
 
                finished = true;
147
 
                return true;
148
 
        }
149
 
        
150
 
        static TransTestWriterThread *newTransTestWriterThread(uint32_t id)
151
 
        {
152
 
                TransTestWriterThread *tt;
153
 
                enter_();
154
 
                
155
 
                
156
 
                new_(tt, TransTestWriterThread());
157
 
                
158
 
                char name[32];
159
 
                sprintf(name, "write_%d.log", id);
160
 
                if (recreate)
161
 
                        tt->myLog = fopen(name, "w+");
162
 
                else {
163
 
                        tt->myLog = fopen(name, "a+");
164
 
                        fprintf(tt->myLog, "====================================================\n");
165
 
                }
166
 
                
167
 
                tt->tab_id = id ;
168
 
                tt->mysql = new_connection(false);
169
 
                tt->log = trans_log;
170
 
                trans_log->retain();
171
 
                
172
 
                return_(tt); 
173
 
        }
174
 
        
175
 
        
176
 
};
177
 
 
178
 
//------------------------------------------------
179
 
class TransTestReaderThread : public TransTestThread {
180
 
public:
181
 
        TransTestReaderThread():TransTestThread(){}
182
 
        
183
 
        bool recovering;
184
 
        void processTransactionLog();
185
 
        bool doWork() 
186
 
        {
187
 
                processTransactionLog();
188
 
                return true;
189
 
        }
190
 
        
191
 
        static TransTestReaderThread *newTransTestReaderThread(MSTrans *log)
192
 
        {
193
 
                TransTestReaderThread *tt;
194
 
                enter_();
195
 
                
196
 
                new_(tt, TransTestReaderThread());
197
 
                tt->mysql = new_connection(false);
198
 
                tt->log = log;
199
 
                tt->log->retain();
200
 
                
201
 
                tt->log->txn_SetReader(tt); // The reader daemon is passed in unreferenced.
202
 
                tt->recovering = false;
203
 
                return_(tt); 
204
 
        }
205
 
        
206
 
        bool rec_found(uint64_t id, uint32_t tab_id) 
207
 
        {
208
 
                char stmt[100];
209
 
                MYSQL_RES *results = NULL;            
210
 
                bool found;
211
 
                
212
 
                sprintf(stmt, "SELECT blob_ref FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %"PRIu32"", id, tab_id); 
213
 
                if (mysql_query(mysql, stmt)) {
214
 
                        printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
215
 
                        printf("%s\n", stmt);
216
 
                        exit(1);
217
 
                }
218
 
                
219
 
                
220
 
                results = mysql_store_result(mysql);
221
 
                if (!results){
222
 
                        printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
223
 
                        exit(1);
224
 
                }
225
 
 
226
 
                found = (mysql_num_rows(results) == 1);         
227
 
                mysql_free_result(results);
228
 
                
229
 
                return found;
230
 
                
231
 
        }
232
 
        
233
 
        
234
 
};
235
 
 
236
 
TransTestReaderThread *TransReader;
237
 
//------------------------------------------------
238
 
static void report_mysql_error(MYSQL *mysql, int line, const char *msg)
239
 
{
240
 
        printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), line);
241
 
        if (msg)
242
 
                printf("%s\n", msg);
243
 
        exit(1);
244
 
}
245
 
 
246
 
 
247
 
//------------------------------------------------
248
 
static MYSQL *new_connection(bool check_for_db)
249
 
{
250
 
        MYSQL *mysql;
251
 
 
252
 
        mysql = mysql_init(NULL);
253
 
        if (!mysql) {
254
 
                printf( "mysql_init() failed.\n");
255
 
                exit(1);
256
 
        }
257
 
 
258
 
        if (mysql_real_connect(mysql, host, user_name, user_passwd, NULL, port, NULL, 0) == NULL)
259
 
                report_mysql_error(mysql, __LINE__, "mysql_real_connect()");
260
 
 
261
 
        if (check_for_db) {
262
 
                MYSQL_RES *results = NULL;            
263
 
                
264
 
                if (mysql_query(mysql, "show databases like \"" TEST_DATABASE_NAME "\""))
265
 
                        report_mysql_error(mysql, __LINE__, "show databases like \"" TEST_DATABASE_NAME "\"");
266
 
                
267
 
                results = mysql_store_result(mysql);
268
 
                if (!results)
269
 
                        report_mysql_error(mysql, __LINE__, "mysql_store_result()");
270
 
 
271
 
 
272
 
                if (mysql_num_rows(results) != 1) {
273
 
                        if (mysql_query(mysql, "create database " TEST_DATABASE_NAME ))
274
 
                                report_mysql_error(mysql, __LINE__, "create database " TEST_DATABASE_NAME );
275
 
                }
276
 
                mysql_free_result(results);
277
 
        }
278
 
        
279
 
        if (mysql_query(mysql, "use " TEST_DATABASE_NAME ))
280
 
                report_mysql_error(mysql, __LINE__, "use " TEST_DATABASE_NAME );
281
 
 
282
 
        return mysql;
283
 
}
284
 
 
285
 
//------------------------------------------------
286
 
static void init_database(MYSQL *mysql, int cnt)
287
 
{
288
 
        char stmt[1024];
289
 
        
290
 
        unlink("ms-trans-log.dat");
291
 
        mysql_query(mysql, "drop table if exists " LOG_TABLE  ";");
292
 
        
293
 
        if (mysql_query(mysql, "create table " LOG_TABLE  CREATE_TABLE_BODY ";")){
294
 
                printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
295
 
                exit(1);
296
 
        }
297
 
 
298
 
        while (cnt) {
299
 
                sprintf(stmt, "drop table if exists " REF_TABLE  ";", cnt);
300
 
                mysql_query(mysql, stmt);
301
 
                sprintf(stmt, "create table " REF_TABLE  CREATE_TABLE_BODY ";", cnt);
302
 
                if (mysql_query(mysql, stmt)){
303
 
                        printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
304
 
                        exit(1);
305
 
                }
306
 
                cnt--;
307
 
        }
308
 
}
309
 
 
310
 
 
311
 
//------------------------------------------------
312
 
static void display_help(const char *app)
313
 
{
314
 
        printf("\nUsage:\n");
315
 
        printf("%s -help | -r  [-t<num_threads>] | -d | [-n] [-sc <cache_size>] [-sl <log_size>] [-c <crash_site>]  [-t<num_threads>] [<timeout>]\n\n", app);
316
 
        
317
 
        printf("-r: Test recovery after a crash or shutdown.\n");
318
 
        printf("-d: Dump the transaction log.\n");
319
 
        printf("-n: Recreate the tables and recovery log.\n");
320
 
        printf("-c <crash_site>: Crash at this location rather than shutting down. Max = %d\n", MAX_CRASH_POINT+1);
321
 
        printf("-t<num_threads>: The number of writer threads to use, default is %d.\n", num_threads);
322
 
        //printf("-r<rate>: The number af records to be inserted per second, default is %d.\n", rate);
323
 
        printf("<timeout>: The number seconds the test should run before shuttingdown or crashing, default is %d.\n\n", timeout);
324
 
        exit(1);
325
 
}
326
 
 
327
 
//---------------------------------     
328
 
static void process_args(int argc, const char * argv[])
329
 
{
330
 
        if (argc < 2)
331
 
                return;
332
 
                
333
 
        for (int i = 1; i < argc; ) {
334
 
                if ( argv[i][0] != '-') { // Must be timeout
335
 
                        timeout = atoi(argv[i]);
336
 
                        i++;
337
 
                        if ((i != argc) || !timeout)
338
 
                                display_help(argv[0]);
339
 
                } else {
340
 
                        switch (argv[i][1]) {
341
 
                                case 'h':
342
 
                                        display_help(argv[0]);
343
 
                                        break;
344
 
                                        
345
 
                                case 'r':
346
 
                                        if (argc > 4 || argv[i][2])
347
 
                                                display_help(argv[0]);
348
 
                                        revover_only = true;
349
 
                                        i++;
350
 
                                        break;
351
 
                                        
352
 
                                case 'd':
353
 
                                        if (argc != 2 || argv[i][2])
354
 
                                                display_help(argv[0]);
355
 
                                        dump_log = true;
356
 
                                        i++;
357
 
                                        break;
358
 
                                        
359
 
                                case 'n':
360
 
                                        if (argv[i][2])
361
 
                                                display_help(argv[0]);
362
 
                                        recreate = true;
363
 
                                        i++;
364
 
                                        break;
365
 
                                        
366
 
                                case 'c':
367
 
                                        if (argv[i][2])
368
 
                                                display_help(argv[0]);
369
 
                                        i++;
370
 
                                        crash_site = atoi(argv[i]);
371
 
                                        if (crash_site == (MAX_CRASH_POINT + 1))
372
 
                                                overflow_crash = true;
373
 
                                        else if ((!crash_site) || (crash_site >  MAX_CRASH_POINT))
374
 
                                                display_help(argv[0]);
375
 
                                        i++;
376
 
                                        break;
377
 
                                        
378
 
                                case 's': {
379
 
                                                uint32_t size;
380
 
                                                
381
 
                                                size = atol(argv[i+1]);
382
 
                                                if (!size)
383
 
                                                        display_help(argv[0]);
384
 
                                                        
385
 
                                                if (argv[i][2] == 'c') 
386
 
                                                        cache_size = size;
387
 
                                                else if (argv[i][2] == 'l')
388
 
                                                        log_size = size;
389
 
                                                else 
390
 
                                                        display_help(argv[0]);
391
 
                                                
392
 
                                                i+=2;
393
 
                                        }
394
 
                                        break;
395
 
                                        
396
 
                                case 't':
397
 
                                        if (argv[i][2])
398
 
                                                display_help(argv[0]);
399
 
                                        i++;
400
 
                                        num_threads = atoi(argv[i]);
401
 
                                        if (!num_threads)
402
 
                                                display_help(argv[0]);
403
 
                                        i++;
404
 
                                        break;
405
 
/*                                      
406
 
                                case 'r':
407
 
                                        i++;
408
 
                                        rate = atoi(argv[i]);
409
 
                                        if (!rate)
410
 
                                                display_help(argv[0]);
411
 
                                        i++;
412
 
                                        break;
413
 
*/
414
 
                                default:
415
 
                                        display_help(argv[0]);
416
 
                        }
417
 
                        
418
 
                }
419
 
        }
420
 
}
421
 
 
422
 
//---------------------------------     
423
 
static void init_env()
424
 
{
425
 
        cs_init_memory();
426
 
        CSThread::startUp();
427
 
        if (!(main_thread = CSThread::newCSThread())) {
428
 
                CSException::logOSError(CS_CONTEXT, ENOMEM);
429
 
                exit(1);
430
 
        }
431
 
        
432
 
        CSThread::setSelf(main_thread);
433
 
        
434
 
        enter_();
435
 
        try_(a) {
436
 
                trans_log = MSTrans::txn_NewMSTrans("./ms-trans-log.dat", /*dump_log*/ true);
437
 
                new_(thread_list, CSThreadList()); 
438
 
        }
439
 
        catch_(a) {
440
 
                self->logException();
441
 
                CSThread::shutDown();
442
 
                exit(1);
443
 
        }
444
 
        cont_(a);
445
 
        
446
 
}
447
 
//---------------------------------     
448
 
static void deinit_env()
449
 
{
450
 
        if (thread_list) {
451
 
                thread_list->release();
452
 
                thread_list = NULL;
453
 
        }
454
 
        
455
 
        if (trans_log) {
456
 
                trans_log->release();
457
 
                trans_log = NULL;
458
 
        }
459
 
        
460
 
        if (main_thread) {
461
 
                main_thread->release();
462
 
                main_thread = NULL;
463
 
        }
464
 
        
465
 
        CSThread::shutDown();
466
 
        cs_exit_memory();
467
 
}
468
 
//---------------------------------     
469
 
static bool verify_database(MYSQL *mysql)
470
 
{
471
 
        MYSQL_RES **r_results, *l_results = NULL;            
472
 
        MYSQL_ROW r_record, l_record;
473
 
        bool ok = false;
474
 
        int i, log_row_cnt, ref_row_cnt = 0, tab_id;
475
 
        char stmt[1024];
476
 
        
477
 
        r_results = (MYSQL_RES **) malloc(num_threads * sizeof(MYSQL_RES *));
478
 
        
479
 
        if (mysql_query(mysql, "select * from "LOG_TABLE" where committed = 0 order by blob_ref")) 
480
 
                report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
481
 
                                                
482
 
        l_results = mysql_store_result(mysql);
483
 
        if (!l_results)
484
 
                report_mysql_error(mysql, __LINE__, "mysql_store_result()");
485
 
 
486
 
        log_row_cnt = mysql_num_rows(l_results);
487
 
        mysql_free_result(l_results);
488
 
        if (log_row_cnt)
489
 
                printf("Uncommitted references: %d\n", log_row_cnt);
490
 
 
491
 
        //---------
492
 
        for (i =0; i < num_threads; i++) {
493
 
                sprintf(stmt, "select * from "REF_TABLE" order by blob_ref", i+1);
494
 
                if (mysql_query(mysql, stmt)) 
495
 
                        report_mysql_error(mysql, __LINE__, stmt);
496
 
                                                        
497
 
                r_results[i] = mysql_store_result(mysql);
498
 
                if (!r_results)
499
 
                        report_mysql_error(mysql, __LINE__, "mysql_store_result()");
500
 
                        
501
 
                ref_row_cnt += mysql_num_rows(r_results[i]);
502
 
        }       
503
 
        //---------
504
 
        if (mysql_query(mysql, "select * from "LOG_TABLE" order by blob_ref")) 
505
 
                report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
506
 
                                                
507
 
        l_results = mysql_store_result(mysql);
508
 
        if (!l_results)
509
 
                report_mysql_error(mysql, __LINE__, "mysql_store_result()");
510
 
 
511
 
        log_row_cnt = mysql_num_rows(l_results);
512
 
        
513
 
        if (log_row_cnt != ref_row_cnt) {
514
 
                if (ref_row_cnt > log_row_cnt) {
515
 
                        printf("verify_database() Failed: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt,  ref_row_cnt);
516
 
                        goto done;
517
 
                }
518
 
                
519
 
                printf("verify_database() Warnning: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt,  ref_row_cnt);         
520
 
                printf("Possible unreferenced BLOBs\n");
521
 
        }
522
 
        
523
 
        if (log_row_cnt == ref_row_cnt) {
524
 
                for ( i = 0; i < log_row_cnt; i++) {
525
 
                        l_record = mysql_fetch_row(l_results);
526
 
                        tab_id = atol(l_record[1]);
527
 
                        r_record = mysql_fetch_row(r_results[tab_id-1]);                
528
 
                        if ((atol(l_record[0]) != atol(r_record[0])) ||
529
 
                                (atol(l_record[1]) != atol(r_record[1])) ||
530
 
                                (atol(l_record[2]) != atol(r_record[2]))) {
531
 
                                
532
 
                                printf("verify_database() Failed: in row %d, tab_id %d\n", i+1, tab_id);
533
 
                                printf("field 1:  %d =? %d\n", atol(l_record[0]), atol(r_record[0]));
534
 
                                printf("field 2:  %d =? %d\n", atol(l_record[1]), atol(r_record[1]));
535
 
                                printf("field 3:  %d =? %d\n", atol(l_record[2]), atol(r_record[2]));
536
 
                                goto done;
537
 
                        }
538
 
                                
539
 
                }
540
 
        } else { // The important thing is that there are no BLOBs in the ref tabels that are not in the log table.
541
 
 
542
 
                for (i =0; i < num_threads; i++) {
543
 
                        mysql_free_result(r_results[i]);
544
 
                        
545
 
                        sprintf(stmt, "select * from "REF_TABLE" where  blob_ref not in (select blob_ref from TransTest.translog where tab_id = %d)", i+1, i+1);
546
 
                        if (mysql_query(mysql, stmt)) 
547
 
                                report_mysql_error(mysql, __LINE__, stmt);
548
 
                                                                
549
 
                        r_results[i] = mysql_store_result(mysql);
550
 
                        if (!r_results)
551
 
                                report_mysql_error(mysql, __LINE__, "mysql_store_result()");
552
 
                                
553
 
                        if (mysql_num_rows(r_results[i])) {
554
 
                                printf("verify_database() Failed, Missing BLOBs: %s\n", stmt);
555
 
                                goto done;
556
 
                        }
557
 
                }       
558
 
        }
559
 
        
560
 
        printf("verify_database() OK.\n");
561
 
        ok = true;
562
 
        
563
 
        done:
564
 
        
565
 
        for (i =0; i < num_threads; i++) {
566
 
                mysql_free_result(r_results[i]);
567
 
        }
568
 
        free(r_results);
569
 
        
570
 
        mysql_free_result(l_results);
571
 
        
572
 
#ifdef DEBUG    
573
 
        if (!ok) {
574
 
                trans_log->txn_DumpLog("trace.log");
575
 
        }
576
 
#endif  
577
 
        return ok;
578
 
}
579
 
 
580
 
//------------------------------------------------
581
 
void TransTestReaderThread::processTransactionLog()
582
 
{
583
 
        MSTransRec rec = {0};
584
 
        MS_TxnState state;
585
 
        char stmt[1024];
586
 
        uint32_t last_tid = 0;
587
 
        enter_();
588
 
        
589
 
        // Read in transactions from the log and update
590
 
        // the database table based on them.
591
 
        
592
 
        try_(a) {
593
 
                while (!myMustQuit && !stopit) {
594
 
                        // This will sleep while waiting for the next 
595
 
                        // completed transaction.
596
 
                        log->txn_GetNextTransaction(&rec, &state); 
597
 
                        if (myMustQuit)
598
 
                                break;
599
 
 
600
 
                        myActivity++;
601
 
#ifdef CHECK_TIDS
602
 
                        if (num_threads == 1) {
603
 
                                ASSERT( ((last_tid + 1) == rec.tr_id) || (last_tid  == rec.tr_id) || !last_tid);
604
 
                                last_tid = rec.tr_id;
605
 
                        }
606
 
#endif                  
607
 
                        if (!recovering) 
608
 
                                count++;
609
 
                        
610
 
                        switch (TRANS_TYPE(rec.tr_type)) {
611
 
                                case MS_ReferenceTxn:
612
 
                                case MS_DereferenceTxn:
613
 
                                case MS_RollBackTxn:
614
 
                                case MS_CommitTxn:
615
 
                                case MS_RecoveredTxn:
616
 
                                break;
617
 
                                default:
618
 
                                        printf("Unexpected transaction type: %d\n", rec.tr_type);
619
 
                                        exit(1);                                                        
620
 
                        }
621
 
                        
622
 
                        if (state == MS_Committed){
623
 
                                // Dereferences are applied when the transaction is commited.
624
 
                                // References are applied imediatly and removed if the transaction is rolled back.
625
 
                                if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn) {
626
 
                                        sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
627
 
                                        if (mysql_query(mysql, stmt))  
628
 
                                                report_mysql_error(mysql, __LINE__, stmt);
629
 
                                } else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
630
 
                                        sprintf(stmt, "UPDATE "LOG_TABLE" SET committed = 1 WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
631
 
                                        if (mysql_query(mysql, stmt))  
632
 
                                                report_mysql_error(mysql, __LINE__, stmt);
633
 
                                }
634
 
                        } else if (state == MS_RolledBack) { 
635
 
                                //printf("ROLLBACK!\n");
636
 
                                if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
637
 
                                        sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
638
 
                                        if (mysql_query(mysql, stmt))  
639
 
                                                report_mysql_error(mysql, __LINE__, stmt);
640
 
                                }
641
 
                        } else if (state == MS_Recovered) { 
642
 
                                printf("Recovered transaction being ignored:\n");
643
 
                                printf("blob_ref = %"PRIu64", tab_id = %d, blob_id = %"PRIu64"\n\n", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
644
 
                        } else {
645
 
                                printf("Unexpected transaction state: %d\n", state);
646
 
                                exit(1);                                                        
647
 
                        }
648
 
                        
649
 
                        
650
 
                }
651
 
        }
652
 
        catch_(a) {
653
 
                self->logException();
654
 
                printf("\n\n!!!!!!!! THE TRANSACTION LOG READER DIED! !!!!!!!!!!!\n\n");
655
 
                if (!myMustQuit && !stopit)
656
 
                        exit(1);
657
 
        }
658
 
        cont_(a);
659
 
        printf("The transaction log reader shutting down.\n");
660
 
        exit_();
661
 
}
662
 
 
663
 
//------------------------------------------------
664
 
void TransTestWriterThread::generate_records()
665
 
{
666
 
 
667
 
        MS_Txn  txn_type;               
668
 
        uint64_t        blob_id;        
669
 
        uint64_t        blob_ref_id;    
670
 
        int tsize, i;
671
 
        bool do_delete;
672
 
                
673
 
        char stmt[1024];
674
 
        enter_();
675
 
 
676
 
        try_(a) {
677
 
                while (!myMustQuit && !stopit) {
678
 
                
679
 
                        myActivity++;
680
 
                        usleep(nap_time); // Give up a bit of time
681
 
                        if (myMustQuit || stopit)
682
 
                                break;
683
 
                                
684
 
                        tsize = rand() % max_transaction;
685
 
                        
686
 
                        if (mysql_autocommit(mysql, 0))
687
 
                                report_mysql_error(mysql, __LINE__, "mysql_autocommit()");
688
 
                                
689
 
                        i = 0;
690
 
                        do {
691
 
                                do_delete = ((rand() %2) == 0);
692
 
                                
693
 
                                // decide if this is an insert or delete
694
 
                                if (do_delete) {
695
 
                                        MYSQL_RES *results = NULL;            
696
 
                                        MYSQL_ROW record;
697
 
                                        int cnt;
698
 
                                        
699
 
                                        // If we are deleting then randomly select a record to delete
700
 
                                        // and delete it. 
701
 
                                        
702
 
                                        txn_type = MS_DereferenceTxn;
703
 
 
704
 
                                        sprintf(stmt, "select * from "REF_TABLE, tab_id); 
705
 
                                        if (mysql_query(mysql, stmt)) 
706
 
                                                report_mysql_error(mysql, __LINE__, stmt);
707
 
                                                
708
 
                                        results = mysql_store_result(mysql);
709
 
                                        if (!results)
710
 
                                                report_mysql_error(mysql, __LINE__, "mysql_store_result()");
711
 
                                                
712
 
                                        cnt = mysql_num_rows(results);
713
 
                                        if (!cnt)
714
 
                                                do_delete = false; // There is nothing to delete
715
 
                                        else {
716
 
                                                mysql_data_seek(results, rand()%cnt);
717
 
                                                record = mysql_fetch_row(results);
718
 
                                                        
719
 
                                                blob_ref_id = atol(record[0]);
720
 
                                                blob_id = atol(record[2]);
721
 
                                                
722
 
                                                sprintf(stmt, "DELETE FROM "REF_TABLE" WHERE blob_ref = %"PRIu64" AND blob_id = %"PRIu64"", tab_id, blob_ref_id, blob_id); 
723
 
                                                if (mysql_query(mysql, stmt))  
724
 
                                                        report_mysql_error(mysql, __LINE__, stmt);
725
 
                                                        
726
 
                                                if (mysql_affected_rows(mysql) == 0)
727
 
                                                        do_delete = false; // Another thread must have deleted the row first.
728
 
                                                else
729
 
                                                        fprintf(myLog, "DELETE %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id); 
730
 
                                        }
731
 
                                        
732
 
                                        mysql_free_result(results);
733
 
                                } 
734
 
                                
735
 
                                if (!do_delete) {
736
 
                                        blob_id = self->myTID; // Assign the tid as the blob id to help with debugging.
737
 
                                        txn_type = MS_ReferenceTxn;
738
 
                                        
739
 
                                        sprintf(stmt, "INSERT INTO "REF_TABLE" VALUES( NULL, %d, %"PRIu64", 0)", tab_id, tab_id, blob_id); 
740
 
                                        if (mysql_query(mysql, stmt)) 
741
 
                                                report_mysql_error(mysql, __LINE__, stmt);
742
 
                                                
743
 
                                        blob_ref_id = mysql_insert_id(mysql);
744
 
                                        if (!blob_ref_id)
745
 
                                                report_mysql_error(mysql, __LINE__, "mysql_insert_id() returned 0");
746
 
                                        
747
 
                                        fprintf(myLog, "INSERT %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id);   
748
 
                                        // Apply the blob reference now. This will be undone if the transaction is rolled back.
749
 
                                        sprintf(stmt, "INSERT INTO "LOG_TABLE" VALUES(%"PRIu64", %d, %"PRIu64", 0)", blob_ref_id, tab_id, blob_id); 
750
 
                                        if (mysql_query(mysql, stmt)) 
751
 
                                                report_mysql_error(mysql, __LINE__, stmt);
752
 
                                }
753
 
 
754
 
                                i++;
755
 
                                count++;
756
 
                                if (i >= tsize) { //Commit the database transaction before the log transaction.
757
 
                                        bool rollback;
758
 
                                        
759
 
                                        rollback = ((tsize > 0) && ((rand() % 1000) == 0));
760
 
                                        if (rollback) {
761
 
                                                printf("Rollback\n");
762
 
                                                if (mysql_rollback(mysql)) // commit the staement to the database,
763
 
                                                        report_mysql_error(mysql, __LINE__, "mysql_rollback()");        
764
 
                                                fprintf(myLog, "Rollback %"PRIu32"\n", self->myTID);    
765
 
                                                log->txn_LogTransaction(MS_RollBackTxn);
766
 
                                        } else {
767
 
                                                if (mysql_commit(mysql)) // commit the staement to the database,
768
 
                                                        report_mysql_error(mysql, __LINE__, "mysql_commit()");  
769
 
                                                fprintf(myLog, "Commit %"PRIu32"\n", self->myTID);      
770
 
                                                log->txn_LogTransaction(txn_type, true, A_DB_ID, tab_id, blob_id, blob_ref_id);
771
 
                                        }
772
 
                                } else
773
 
                                        log->txn_LogTransaction(txn_type, false, A_DB_ID, tab_id, blob_id, blob_ref_id);
774
 
                                                                
775
 
                        } while ( i < tsize);
776
 
                                                
777
 
                }
778
 
        }
779
 
        
780
 
        catch_(a) {
781
 
                self->logException();
782
 
                printf("\n\nA writer thread for table %d died! \n\n", tab_id);
783
 
                if (i == tsize) {
784
 
                        printf(" It is possible that the last %d operations on table %d were committed to the database but not to the log.\n", tsize, tab_id);
785
 
                }
786
 
                if (!myMustQuit && !stopit)
787
 
                        exit(1);
788
 
        }
789
 
        cont_(a);
790
 
        printf("Writer thread for table %d is shutting down.\n", tab_id);
791
 
        exit_();
792
 
}
793
 
 
794
 
// SELECT * FROM TransTest.translog where  blob_ref not in (select blob_ref from TransTest.transref)
795
 
// SELECT * FROM TransTest.transref_1 where  blob_ref not in (select blob_ref from TransTest.translog where tab_id = 1)
796
 
// SELECT * FROM TransTest.translog where  tab_id = 1 AND blob_ref not in (select blob_ref from TransTest.transref_1)
797
 
// select count(*) from TransTest.translog where committed = 1
798
 
//---------------------------------     
799
 
int main (int argc, const char * argv[]) 
800
 
{
801
 
        MYSQL *mysql;
802
 
        TransTestWriterThread **writer = NULL;
803
 
        int rtc = 1;
804
 
        
805
 
        process_args(argc, argv);
806
 
        
807
 
        mysql = new_connection(true);
808
 
        
809
 
        if (recreate)
810
 
                init_database(mysql, num_threads);
811
 
                
812
 
        init_env();
813
 
        enter_();
814
 
        
815
 
        if (dump_log) {
816
 
                printf("LOG dumped\n");
817
 
                exit(1);
818
 
        }
819
 
        
820
 
        TransReader = TransTestReaderThread::newTransTestReaderThread(trans_log);
821
 
        push_(TransReader);
822
 
        TransReader->recovering = true;
823
 
        TransReader->start();
824
 
        
825
 
        // wait until the recovery is complete.
826
 
        while (trans_log->txn_GetNumRecords())
827
 
                usleep(100);
828
 
                
829
 
        TransReader->recovering = false;
830
 
        
831
 
        if (log_size)
832
 
                trans_log->txn_SetLogSize(log_size);
833
 
                
834
 
        if (cache_size)
835
 
                trans_log->txn_SetCacheSize(cache_size);
836
 
                
837
 
        if (revover_only) {
838
 
                TransReader->stopit = true;
839
 
                if (verify_database(mysql))
840
 
                        rtc = 0;
841
 
                goto done;
842
 
        }
843
 
        
844
 
        try_(a) {
845
 
                writer = (TransTestWriterThread **) cs_malloc(num_threads * sizeof(TransTestWriterThread *));
846
 
                for (int i = 0; i < num_threads; i++) {
847
 
                        TransTestWriterThread *wt = TransTestWriterThread::newTransTestWriterThread(i+1);
848
 
                        wt->start();
849
 
                        writer[i] = wt;
850
 
                }
851
 
        
852
 
                printf("Timeout: %d seconds\n", timeout); 
853
 
                timeout += time(NULL);
854
 
                int header = 0;
855
 
                while (timeout > time(NULL)) {
856
 
                        MSTransStatsRec stats;
857
 
                        self->sleep(1000);
858
 
                        trans_log->txn_GetStats(&stats);
859
 
                        
860
 
                        
861
 
                        if (!(header%20)) {
862
 
                                for (int i = 0; i < num_threads; i++) {                         
863
 
                                        if (writer[i]->myActivity == 0) {
864
 
                                                printf("Writer thread %d HUNG!!!\n", i);
865
 
                                        }
866
 
                                        writer[i]->myActivity = 0;
867
 
                                }
868
 
                                
869
 
                                if (TransReader->myActivity == 0) {
870
 
                                        printf("Reader thread HUNG!!!\n");
871
 
                                }
872
 
                                TransReader->myActivity = 0;
873
 
                                        
874
 
                                printf("%s | %s | %s | %s | %s | %s | %s | %s\n", "LogSize", "Full", "MaxSize", "Overflows", "Overflowing", "CacheSize", "Cache Used", "Cache Hit");
875
 
                        }
876
 
                        header++;
877
 
                        //printf("Writes: %d \t\t Reads: %d \t%d \t start: %"PRIu64"\t\t eol:%"PRIu64"\n", count, TransReader->count, count - TransReader->count, trans_log->txn_Start, trans_log->txn_EOL);
878
 
                        printf("%7llu | %3d%% | %7llu | %9d | %11s | %9d | %9d%% | %9d%%\n",// | \t\t\t%"PRIu64" \t%"PRIu64"\n", 
879
 
                                stats.ts_LogSize,
880
 
                                stats.ts_PercentFull,
881
 
                                stats.ts_MaxSize,
882
 
                                stats.ts_OverflowCount,
883
 
                                (stats.ts_IsOverflowing)?"Over Flow": "   ---   ",
884
 
                                stats.ts_TransCacheSize,
885
 
                                stats.ts_PercentTransCacheUsed,
886
 
                                stats.ts_PercentCacheHit//, trans_log->txn_Start, trans_log->txn_EOL
887
 
                                );
888
 
                                
889
 
                                if (stats.ts_IsOverflowing && overflow_crash) {
890
 
                                        printf("Simulating crash while in overflow\n");
891
 
                                        exit(1);
892
 
                                }
893
 
                }
894
 
 
895
 
#ifdef CRASH_TEST               
896
 
                if (crash_site) {
897
 
                        printf("Crashing at crash site %d\n", crash_site);
898
 
                        trans_test_crash_point = crash_site;
899
 
                        // set the crash site and wait to die.
900
 
                        while(1)
901
 
                                self->sleep(1000);
902
 
                }
903
 
#endif
904
 
                
905
 
                printf("Shutting down the writer threads:\n");
906
 
                for (int i = 0; i < num_threads; i++) {
907
 
                        writer[i]->stopit = true;
908
 
                }
909
 
                
910
 
                TransReader->stopit = true;
911
 
                // Give the writers a chance to shutdown by themselves.
912
 
                int cnt = 100;
913
 
                while (cnt) {
914
 
                        int i;
915
 
                        for (i = 0; i < num_threads && writer[i]->finished; i++);
916
 
                        if (i == num_threads && TransReader->finished)
917
 
                                break;
918
 
                        self->sleep(10);        
919
 
                        cnt--;                  
920
 
                }
921
 
                
922
 
                for (int i = 0; i < num_threads; i++) {
923
 
                        writer[i]->stop();
924
 
                }
925
 
                
926
 
        }
927
 
        rtc = 0;
928
 
        catch_(a) {
929
 
                printf("Main thread abort.\n");
930
 
                self->logException();
931
 
        }
932
 
        cont_(a);
933
 
        if (writer) {
934
 
                for (int i = 0; i < num_threads; i++) {
935
 
                        writer[i]->stop();
936
 
                        writer[i]->release();
937
 
                }
938
 
                cs_free(writer);
939
 
        }
940
 
                
941
 
done:
942
 
        TransReader->stop();
943
 
        release_(TransReader);
944
 
        
945
 
        outer_();
946
 
        
947
 
        thread_list->stopAllThreads();
948
 
        deinit_env();
949
 
        mysql_close(mysql);
950
 
        exit(rtc);
951
 
}
952
 
 
953
 
#endif // UNIT_TEST