~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/TransTest.cc

  • Committer: Monty Taylor
  • Date: 2010-07-04 20:02:43 UTC
  • mfrom: (1548.2.40 drizzle_pbms)
  • mto: This revision was merged to the branch mainline in revision 1644.
  • Revision ID: mordred@inaugust.com-20100704200243-2vkq9gi6ysauj2tb
Merge PBMS from Barry.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Barry Leslie
 
20
 *
 
21
 * 2009-06-17
 
22
 *
 
23
 * H&G2JCtL
 
24
 *
 
25
 * PBMS transaction handling test driver.
 
26
 *
 
27
 * This is a test driver for the PBMS transaction log. It uses 2 tables in a database and
 
28
 * inserts transaction records into 1 while writing them to the transaction log. The transaction
 
29
 * log reader thread reads the transactions from the log and writes them to the second table.
 
30
 * After a recovery the 2 tables should be identical.
 
31
 *
 
32
 * Built in crash points can be triggered to test that the recovery works correctly.
 
33
 *
 
34
 */
 
35
 
 
36
#ifdef UNIT_TEST
 
37
 
 
38
#include <stdlib.h>
 
39
#include <stdio.h>
 
40
#include <unistd.h>
 
41
#include <string.h>
 
42
#include <ctype.h>
 
43
#include <inttypes.h>
 
44
 
 
45
#include "cslib/CSConfig.h"
 
46
#include "cslib/CSGlobal.h"
 
47
#include "cslib/CSThread.h"
 
48
#include "cslib/CSStrUtil.h"
 
49
#include "cslib/CSStorage.h"
 
50
 
 
51
#include "trans_cache_ms.h"
 
52
#include "trans_log_ms.h"
 
53
 
 
54
#include "mysql.h"
 
55
 
 
56
#define CREATE_TABLE_BODY "\
 
57
 (\
 
58
  blob_ref INT NOT NULL AUTO_INCREMENT,\
 
59
  tab_id INT NOT NULL,\
 
60
  blob_id BIGINT NOT NULL, \
 
61
  committed BOOLEAN NOT NULL DEFAULT 0, \
 
62
  PRIMARY KEY (blob_ref, tab_id)\
 
63
)\
 
64
ENGINE = INNODB\
 
65
"
 
66
#ifdef LOG_TABLE
 
67
#undef LOG_TABLE
 
68
#endif
 
69
 
 
70
#define LOG_TABLE       "translog"
 
71
#define REF_TABLE       "transref_%d"
 
72
#define MAX_THREADS     20
 
73
 
 
74
#define A_DB_ID 123
 
75
 
 
76
#define TEST_DATABASE_NAME "TransTest"
 
77
static const char *user_name = "root";
 
78
static const char *user_passwd = "";
 
79
static int      port = 3306;
 
80
static const char *host = "localhost";
 
81
static int nap_time = 1000;
 
82
static int max_transaction = 10; // The maximum number of records generated per transaction
 
83
static bool dump_log = false, overflow_crash = false;
 
84
static int crash_site = 0;              // The location to crash at.
 
85
static int num_threads = 1;             // The number of writer threads.
 
86
//static int rate = 1000;                       // The maximum transactions per second to allow.
 
87
static time_t timeout = 60;             // How long to run for before crashing or shutting down.
 
88
static bool revover_only = false;
 
89
static bool recreate = false;
 
90
 
 
91
static uint32_t cache_size = 0, log_size = 0;
 
92
 
 
93
static MSTrans *trans_log;
 
94
 
 
95
static CSThreadList *thread_list;
 
96
 
 
97
static MYSQL *new_connection(bool check_for_db);
 
98
 
 
99
static CSThread *main_thread;
 
100
 
 
101
//------------------------------------------------
 
102
class TransTestThread : public CSDaemon {
 
103
public:
 
104
        TransTestThread(): 
 
105
                CSDaemon(thread_list),
 
106
                count(0),
 
107
                myActivity(0),
 
108
                log(NULL),
 
109
                stopit(false),
 
110
                finished(false),
 
111
                mysql(NULL)
 
112
                {}
 
113
 
 
114
         ~TransTestThread()
 
115
        {
 
116
                if (log)
 
117
                        log->release();
 
118
                        
 
119
                if (mysql)
 
120
                        mysql_close(mysql);
 
121
        }
 
122
         
 
123
        MSTrans *log;
 
124
        MYSQL *mysql;
 
125
        uint32_t count;
 
126
        uint32_t myActivity;
 
127
 
 
128
        bool stopit;
 
129
        bool finished;
 
130
        
 
131
        virtual bool doWork() {return true;}    
 
132
};
 
133
 
 
134
//------------------------------------------------
 
135
class TransTestWriterThread : public TransTestThread {
 
136
public:
 
137
        TransTestWriterThread():TransTestThread() {}
 
138
        
 
139
        uint32_t tab_id;
 
140
        FILE    *myLog;
 
141
        
 
142
        void generate_records();
 
143
        bool doWork() 
 
144
        {
 
145
                generate_records();
 
146
                finished = true;
 
147
                return true;
 
148
        }
 
149
        
 
150
        static TransTestWriterThread *newTransTestWriterThread(uint32_t id)
 
151
        {
 
152
                TransTestWriterThread *tt;
 
153
                enter_();
 
154
                
 
155
                
 
156
                new_(tt, TransTestWriterThread());
 
157
                
 
158
                char name[32];
 
159
                sprintf(name, "write_%d.log", id);
 
160
                if (recreate)
 
161
                        tt->myLog = fopen(name, "w+");
 
162
                else {
 
163
                        tt->myLog = fopen(name, "a+");
 
164
                        fprintf(tt->myLog, "====================================================\n");
 
165
                }
 
166
                
 
167
                tt->tab_id = id ;
 
168
                tt->mysql = new_connection(false);
 
169
                tt->log = trans_log;
 
170
                trans_log->retain();
 
171
                
 
172
                return_(tt); 
 
173
        }
 
174
        
 
175
        
 
176
};
 
177
 
 
178
//------------------------------------------------
 
179
class TransTestReaderThread : public TransTestThread {
 
180
public:
 
181
        TransTestReaderThread():TransTestThread(){}
 
182
        
 
183
        bool recovering;
 
184
        void processTransactionLog();
 
185
        bool doWork() 
 
186
        {
 
187
                processTransactionLog();
 
188
                return true;
 
189
        }
 
190
        
 
191
        static TransTestReaderThread *newTransTestReaderThread(MSTrans *log)
 
192
        {
 
193
                TransTestReaderThread *tt;
 
194
                enter_();
 
195
                
 
196
                new_(tt, TransTestReaderThread());
 
197
                tt->mysql = new_connection(false);
 
198
                tt->log = log;
 
199
                tt->log->retain();
 
200
                
 
201
                tt->log->txn_SetReader(tt); // The reader daemon is passed in unreferenced.
 
202
                tt->recovering = false;
 
203
                return_(tt); 
 
204
        }
 
205
        
 
206
        bool rec_found(uint64_t id, uint32_t tab_id) 
 
207
        {
 
208
                char stmt[100];
 
209
                MYSQL_RES *results = NULL;            
 
210
                bool found;
 
211
                
 
212
                sprintf(stmt, "SELECT blob_ref FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %"PRIu32"", id, tab_id); 
 
213
                if (mysql_query(mysql, stmt)) {
 
214
                        printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
 
215
                        printf("%s\n", stmt);
 
216
                        exit(1);
 
217
                }
 
218
                
 
219
                
 
220
                results = mysql_store_result(mysql);
 
221
                if (!results){
 
222
                        printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
 
223
                        exit(1);
 
224
                }
 
225
 
 
226
                found = (mysql_num_rows(results) == 1);         
 
227
                mysql_free_result(results);
 
228
                
 
229
                return found;
 
230
                
 
231
        }
 
232
        
 
233
        
 
234
};
 
235
 
 
236
TransTestReaderThread *TransReader;
 
237
//------------------------------------------------
 
238
static void report_mysql_error(MYSQL *mysql, int line, const char *msg)
 
239
{
 
240
        printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), line);
 
241
        if (msg)
 
242
                printf("%s\n", msg);
 
243
        exit(1);
 
244
}
 
245
 
 
246
 
 
247
//------------------------------------------------
 
248
static MYSQL *new_connection(bool check_for_db)
 
249
{
 
250
        MYSQL *mysql;
 
251
 
 
252
        mysql = mysql_init(NULL);
 
253
        if (!mysql) {
 
254
                printf( "mysql_init() failed.\n");
 
255
                exit(1);
 
256
        }
 
257
 
 
258
        if (mysql_real_connect(mysql, host, user_name, user_passwd, NULL, port, NULL, 0) == NULL)
 
259
                report_mysql_error(mysql, __LINE__, "mysql_real_connect()");
 
260
 
 
261
        if (check_for_db) {
 
262
                MYSQL_RES *results = NULL;            
 
263
                
 
264
                if (mysql_query(mysql, "show databases like \"" TEST_DATABASE_NAME "\""))
 
265
                        report_mysql_error(mysql, __LINE__, "show databases like \"" TEST_DATABASE_NAME "\"");
 
266
                
 
267
                results = mysql_store_result(mysql);
 
268
                if (!results)
 
269
                        report_mysql_error(mysql, __LINE__, "mysql_store_result()");
 
270
 
 
271
 
 
272
                if (mysql_num_rows(results) != 1) {
 
273
                        if (mysql_query(mysql, "create database " TEST_DATABASE_NAME ))
 
274
                                report_mysql_error(mysql, __LINE__, "create database " TEST_DATABASE_NAME );
 
275
                }
 
276
                mysql_free_result(results);
 
277
        }
 
278
        
 
279
        if (mysql_query(mysql, "use " TEST_DATABASE_NAME ))
 
280
                report_mysql_error(mysql, __LINE__, "use " TEST_DATABASE_NAME );
 
281
 
 
282
        return mysql;
 
283
}
 
284
 
 
285
//------------------------------------------------
 
286
static void init_database(MYSQL *mysql, int cnt)
 
287
{
 
288
        char stmt[1024];
 
289
        
 
290
        unlink("ms-trans-log.dat");
 
291
        mysql_query(mysql, "drop table if exists " LOG_TABLE  ";");
 
292
        
 
293
        if (mysql_query(mysql, "create table " LOG_TABLE  CREATE_TABLE_BODY ";")){
 
294
                printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
 
295
                exit(1);
 
296
        }
 
297
 
 
298
        while (cnt) {
 
299
                sprintf(stmt, "drop table if exists " REF_TABLE  ";", cnt);
 
300
                mysql_query(mysql, stmt);
 
301
                sprintf(stmt, "create table " REF_TABLE  CREATE_TABLE_BODY ";", cnt);
 
302
                if (mysql_query(mysql, stmt)){
 
303
                        printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
 
304
                        exit(1);
 
305
                }
 
306
                cnt--;
 
307
        }
 
308
}
 
309
 
 
310
 
 
311
//------------------------------------------------
 
312
static void display_help(const char *app)
 
313
{
 
314
        printf("\nUsage:\n");
 
315
        printf("%s -help | -r  [-t<num_threads>] | -d | [-n] [-sc <cache_size>] [-sl <log_size>] [-c <crash_site>]  [-t<num_threads>] [<timeout>]\n\n", app);
 
316
        
 
317
        printf("-r: Test recovery after a crash or shutdown.\n");
 
318
        printf("-d: Dump the transaction log.\n");
 
319
        printf("-n: Recreate the tables and recovery log.\n");
 
320
        printf("-c <crash_site>: Crash at this location rather than shutting down. Max = %d\n", MAX_CRASH_POINT+1);
 
321
        printf("-t<num_threads>: The number of writer threads to use, default is %d.\n", num_threads);
 
322
        //printf("-r<rate>: The number af records to be inserted per second, default is %d.\n", rate);
 
323
        printf("<timeout>: The number seconds the test should run before shuttingdown or crashing, default is %d.\n\n", timeout);
 
324
        exit(1);
 
325
}
 
326
 
 
327
//---------------------------------     
 
328
static void process_args(int argc, const char * argv[])
 
329
{
 
330
        if (argc < 2)
 
331
                return;
 
332
                
 
333
        for (int i = 1; i < argc; ) {
 
334
                if ( argv[i][0] != '-') { // Must be timeout
 
335
                        timeout = atoi(argv[i]);
 
336
                        i++;
 
337
                        if ((i != argc) || !timeout)
 
338
                                display_help(argv[0]);
 
339
                } else {
 
340
                        switch (argv[i][1]) {
 
341
                                case 'h':
 
342
                                        display_help(argv[0]);
 
343
                                        break;
 
344
                                        
 
345
                                case 'r':
 
346
                                        if (argc > 4 || argv[i][2])
 
347
                                                display_help(argv[0]);
 
348
                                        revover_only = true;
 
349
                                        i++;
 
350
                                        break;
 
351
                                        
 
352
                                case 'd':
 
353
                                        if (argc != 2 || argv[i][2])
 
354
                                                display_help(argv[0]);
 
355
                                        dump_log = true;
 
356
                                        i++;
 
357
                                        break;
 
358
                                        
 
359
                                case 'n':
 
360
                                        if (argv[i][2])
 
361
                                                display_help(argv[0]);
 
362
                                        recreate = true;
 
363
                                        i++;
 
364
                                        break;
 
365
                                        
 
366
                                case 'c':
 
367
                                        if (argv[i][2])
 
368
                                                display_help(argv[0]);
 
369
                                        i++;
 
370
                                        crash_site = atoi(argv[i]);
 
371
                                        if (crash_site == (MAX_CRASH_POINT + 1))
 
372
                                                overflow_crash = true;
 
373
                                        else if ((!crash_site) || (crash_site >  MAX_CRASH_POINT))
 
374
                                                display_help(argv[0]);
 
375
                                        i++;
 
376
                                        break;
 
377
                                        
 
378
                                case 's': {
 
379
                                                uint32_t size;
 
380
                                                
 
381
                                                size = atol(argv[i+1]);
 
382
                                                if (!size)
 
383
                                                        display_help(argv[0]);
 
384
                                                        
 
385
                                                if (argv[i][2] == 'c') 
 
386
                                                        cache_size = size;
 
387
                                                else if (argv[i][2] == 'l')
 
388
                                                        log_size = size;
 
389
                                                else 
 
390
                                                        display_help(argv[0]);
 
391
                                                
 
392
                                                i+=2;
 
393
                                        }
 
394
                                        break;
 
395
                                        
 
396
                                case 't':
 
397
                                        if (argv[i][2])
 
398
                                                display_help(argv[0]);
 
399
                                        i++;
 
400
                                        num_threads = atoi(argv[i]);
 
401
                                        if (!num_threads)
 
402
                                                display_help(argv[0]);
 
403
                                        i++;
 
404
                                        break;
 
405
/*                                      
 
406
                                case 'r':
 
407
                                        i++;
 
408
                                        rate = atoi(argv[i]);
 
409
                                        if (!rate)
 
410
                                                display_help(argv[0]);
 
411
                                        i++;
 
412
                                        break;
 
413
*/
 
414
                                default:
 
415
                                        display_help(argv[0]);
 
416
                        }
 
417
                        
 
418
                }
 
419
        }
 
420
}
 
421
 
 
422
//---------------------------------     
 
423
static void init_env()
 
424
{
 
425
        cs_init_memory();
 
426
        CSThread::startUp();
 
427
        if (!(main_thread = CSThread::newCSThread())) {
 
428
                CSException::logOSError(CS_CONTEXT, ENOMEM);
 
429
                exit(1);
 
430
        }
 
431
        
 
432
        CSThread::setSelf(main_thread);
 
433
        
 
434
        enter_();
 
435
        try_(a) {
 
436
                trans_log = MSTrans::txn_NewMSTrans("./ms-trans-log.dat", /*dump_log*/ true);
 
437
                new_(thread_list, CSThreadList()); 
 
438
        }
 
439
        catch_(a) {
 
440
                self->logException();
 
441
                CSThread::shutDown();
 
442
                exit(1);
 
443
        }
 
444
        cont_(a);
 
445
        
 
446
}
 
447
//---------------------------------     
 
448
static void deinit_env()
 
449
{
 
450
        if (thread_list) {
 
451
                thread_list->release();
 
452
                thread_list = NULL;
 
453
        }
 
454
        
 
455
        if (trans_log) {
 
456
                trans_log->release();
 
457
                trans_log = NULL;
 
458
        }
 
459
        
 
460
        if (main_thread) {
 
461
                main_thread->release();
 
462
                main_thread = NULL;
 
463
        }
 
464
        
 
465
        CSThread::shutDown();
 
466
        cs_exit_memory();
 
467
}
 
468
//---------------------------------     
 
469
static bool verify_database(MYSQL *mysql)
 
470
{
 
471
        MYSQL_RES **r_results, *l_results = NULL;            
 
472
        MYSQL_ROW r_record, l_record;
 
473
        bool ok = false;
 
474
        int i, log_row_cnt, ref_row_cnt = 0, tab_id;
 
475
        char stmt[1024];
 
476
        
 
477
        r_results = (MYSQL_RES **) malloc(num_threads * sizeof(MYSQL_RES *));
 
478
        
 
479
        if (mysql_query(mysql, "select * from "LOG_TABLE" where committed = 0 order by blob_ref")) 
 
480
                report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
 
481
                                                
 
482
        l_results = mysql_store_result(mysql);
 
483
        if (!l_results)
 
484
                report_mysql_error(mysql, __LINE__, "mysql_store_result()");
 
485
 
 
486
        log_row_cnt = mysql_num_rows(l_results);
 
487
        mysql_free_result(l_results);
 
488
        if (log_row_cnt)
 
489
                printf("Uncommitted references: %d\n", log_row_cnt);
 
490
 
 
491
        //---------
 
492
        for (i =0; i < num_threads; i++) {
 
493
                sprintf(stmt, "select * from "REF_TABLE" order by blob_ref", i+1);
 
494
                if (mysql_query(mysql, stmt)) 
 
495
                        report_mysql_error(mysql, __LINE__, stmt);
 
496
                                                        
 
497
                r_results[i] = mysql_store_result(mysql);
 
498
                if (!r_results)
 
499
                        report_mysql_error(mysql, __LINE__, "mysql_store_result()");
 
500
                        
 
501
                ref_row_cnt += mysql_num_rows(r_results[i]);
 
502
        }       
 
503
        //---------
 
504
        if (mysql_query(mysql, "select * from "LOG_TABLE" order by blob_ref")) 
 
505
                report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
 
506
                                                
 
507
        l_results = mysql_store_result(mysql);
 
508
        if (!l_results)
 
509
                report_mysql_error(mysql, __LINE__, "mysql_store_result()");
 
510
 
 
511
        log_row_cnt = mysql_num_rows(l_results);
 
512
        
 
513
        if (log_row_cnt != ref_row_cnt) {
 
514
                if (ref_row_cnt > log_row_cnt) {
 
515
                        printf("verify_database() Failed: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt,  ref_row_cnt);
 
516
                        goto done;
 
517
                }
 
518
                
 
519
                printf("verify_database() Warnning: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt,  ref_row_cnt);         
 
520
                printf("Possible unreferenced BLOBs\n");
 
521
        }
 
522
        
 
523
        if (log_row_cnt == ref_row_cnt) {
 
524
                for ( i = 0; i < log_row_cnt; i++) {
 
525
                        l_record = mysql_fetch_row(l_results);
 
526
                        tab_id = atol(l_record[1]);
 
527
                        r_record = mysql_fetch_row(r_results[tab_id-1]);                
 
528
                        if ((atol(l_record[0]) != atol(r_record[0])) ||
 
529
                                (atol(l_record[1]) != atol(r_record[1])) ||
 
530
                                (atol(l_record[2]) != atol(r_record[2]))) {
 
531
                                
 
532
                                printf("verify_database() Failed: in row %d, tab_id %d\n", i+1, tab_id);
 
533
                                printf("field 1:  %d =? %d\n", atol(l_record[0]), atol(r_record[0]));
 
534
                                printf("field 2:  %d =? %d\n", atol(l_record[1]), atol(r_record[1]));
 
535
                                printf("field 3:  %d =? %d\n", atol(l_record[2]), atol(r_record[2]));
 
536
                                goto done;
 
537
                        }
 
538
                                
 
539
                }
 
540
        } else { // The important thing is that there are no BLOBs in the ref tabels that are not in the log table.
 
541
 
 
542
                for (i =0; i < num_threads; i++) {
 
543
                        mysql_free_result(r_results[i]);
 
544
                        
 
545
                        sprintf(stmt, "select * from "REF_TABLE" where  blob_ref not in (select blob_ref from TransTest.translog where tab_id = %d)", i+1, i+1);
 
546
                        if (mysql_query(mysql, stmt)) 
 
547
                                report_mysql_error(mysql, __LINE__, stmt);
 
548
                                                                
 
549
                        r_results[i] = mysql_store_result(mysql);
 
550
                        if (!r_results)
 
551
                                report_mysql_error(mysql, __LINE__, "mysql_store_result()");
 
552
                                
 
553
                        if (mysql_num_rows(r_results[i])) {
 
554
                                printf("verify_database() Failed, Missing BLOBs: %s\n", stmt);
 
555
                                goto done;
 
556
                        }
 
557
                }       
 
558
        }
 
559
        
 
560
        printf("verify_database() OK.\n");
 
561
        ok = true;
 
562
        
 
563
        done:
 
564
        
 
565
        for (i =0; i < num_threads; i++) {
 
566
                mysql_free_result(r_results[i]);
 
567
        }
 
568
        free(r_results);
 
569
        
 
570
        mysql_free_result(l_results);
 
571
        
 
572
#ifdef DEBUG    
 
573
        if (!ok) {
 
574
                trans_log->txn_DumpLog("trace.log");
 
575
        }
 
576
#endif  
 
577
        return ok;
 
578
}
 
579
 
 
580
//------------------------------------------------
 
581
void TransTestReaderThread::processTransactionLog()
 
582
{
 
583
        MSTransRec rec = {0};
 
584
        MS_TxnState state;
 
585
        char stmt[1024];
 
586
        uint32_t last_tid = 0;
 
587
        enter_();
 
588
        
 
589
        // Read in transactions from the log and update
 
590
        // the database table based on them.
 
591
        
 
592
        try_(a) {
 
593
                while (!myMustQuit && !stopit) {
 
594
                        // This will sleep while waiting for the next 
 
595
                        // completed transaction.
 
596
                        log->txn_GetNextTransaction(&rec, &state); 
 
597
                        if (myMustQuit)
 
598
                                break;
 
599
 
 
600
                        myActivity++;
 
601
#ifdef CHECK_TIDS
 
602
                        if (num_threads == 1) {
 
603
                                ASSERT( ((last_tid + 1) == rec.tr_id) || (last_tid  == rec.tr_id) || !last_tid);
 
604
                                last_tid = rec.tr_id;
 
605
                        }
 
606
#endif                  
 
607
                        if (!recovering) 
 
608
                                count++;
 
609
                        
 
610
                        switch (TRANS_TYPE(rec.tr_type)) {
 
611
                                case MS_ReferenceTxn:
 
612
                                case MS_DereferenceTxn:
 
613
                                case MS_RollBackTxn:
 
614
                                case MS_CommitTxn:
 
615
                                case MS_RecoveredTxn:
 
616
                                break;
 
617
                                default:
 
618
                                        printf("Unexpected transaction type: %d\n", rec.tr_type);
 
619
                                        exit(1);                                                        
 
620
                        }
 
621
                        
 
622
                        if (state == MS_Committed){
 
623
                                // Dereferences are applied when the transaction is commited.
 
624
                                // References are applied imediatly and removed if the transaction is rolled back.
 
625
                                if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn) {
 
626
                                        sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
 
627
                                        if (mysql_query(mysql, stmt))  
 
628
                                                report_mysql_error(mysql, __LINE__, stmt);
 
629
                                } else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
 
630
                                        sprintf(stmt, "UPDATE "LOG_TABLE" SET committed = 1 WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
 
631
                                        if (mysql_query(mysql, stmt))  
 
632
                                                report_mysql_error(mysql, __LINE__, stmt);
 
633
                                }
 
634
                        } else if (state == MS_RolledBack) { 
 
635
                                //printf("ROLLBACK!\n");
 
636
                                if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
 
637
                                        sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
 
638
                                        if (mysql_query(mysql, stmt))  
 
639
                                                report_mysql_error(mysql, __LINE__, stmt);
 
640
                                }
 
641
                        } else if (state == MS_Recovered) { 
 
642
                                printf("Recovered transaction being ignored:\n");
 
643
                                printf("blob_ref = %"PRIu64", tab_id = %d, blob_id = %"PRIu64"\n\n", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
 
644
                        } else {
 
645
                                printf("Unexpected transaction state: %d\n", state);
 
646
                                exit(1);                                                        
 
647
                        }
 
648
                        
 
649
                        
 
650
                }
 
651
        }
 
652
        catch_(a) {
 
653
                self->logException();
 
654
                printf("\n\n!!!!!!!! THE TRANSACTION LOG READER DIED! !!!!!!!!!!!\n\n");
 
655
                if (!myMustQuit && !stopit)
 
656
                        exit(1);
 
657
        }
 
658
        cont_(a);
 
659
        printf("The transaction log reader shutting down.\n");
 
660
        exit_();
 
661
}
 
662
 
 
663
//------------------------------------------------
 
664
void TransTestWriterThread::generate_records()
 
665
{
 
666
 
 
667
        MS_Txn  txn_type;               
 
668
        uint64_t        blob_id;        
 
669
        uint64_t        blob_ref_id;    
 
670
        int tsize, i;
 
671
        bool do_delete;
 
672
                
 
673
        char stmt[1024];
 
674
        enter_();
 
675
 
 
676
        try_(a) {
 
677
                while (!myMustQuit && !stopit) {
 
678
                
 
679
                        myActivity++;
 
680
                        usleep(nap_time); // Give up a bit of time
 
681
                        if (myMustQuit || stopit)
 
682
                                break;
 
683
                                
 
684
                        tsize = rand() % max_transaction;
 
685
                        
 
686
                        if (mysql_autocommit(mysql, 0))
 
687
                                report_mysql_error(mysql, __LINE__, "mysql_autocommit()");
 
688
                                
 
689
                        i = 0;
 
690
                        do {
 
691
                                do_delete = ((rand() %2) == 0);
 
692
                                
 
693
                                // decide if this is an insert or delete
 
694
                                if (do_delete) {
 
695
                                        MYSQL_RES *results = NULL;            
 
696
                                        MYSQL_ROW record;
 
697
                                        int cnt;
 
698
                                        
 
699
                                        // If we are deleting then randomly select a record to delete
 
700
                                        // and delete it. 
 
701
                                        
 
702
                                        txn_type = MS_DereferenceTxn;
 
703
 
 
704
                                        sprintf(stmt, "select * from "REF_TABLE, tab_id); 
 
705
                                        if (mysql_query(mysql, stmt)) 
 
706
                                                report_mysql_error(mysql, __LINE__, stmt);
 
707
                                                
 
708
                                        results = mysql_store_result(mysql);
 
709
                                        if (!results)
 
710
                                                report_mysql_error(mysql, __LINE__, "mysql_store_result()");
 
711
                                                
 
712
                                        cnt = mysql_num_rows(results);
 
713
                                        if (!cnt)
 
714
                                                do_delete = false; // There is nothing to delete
 
715
                                        else {
 
716
                                                mysql_data_seek(results, rand()%cnt);
 
717
                                                record = mysql_fetch_row(results);
 
718
                                                        
 
719
                                                blob_ref_id = atol(record[0]);
 
720
                                                blob_id = atol(record[2]);
 
721
                                                
 
722
                                                sprintf(stmt, "DELETE FROM "REF_TABLE" WHERE blob_ref = %"PRIu64" AND blob_id = %"PRIu64"", tab_id, blob_ref_id, blob_id); 
 
723
                                                if (mysql_query(mysql, stmt))  
 
724
                                                        report_mysql_error(mysql, __LINE__, stmt);
 
725
                                                        
 
726
                                                if (mysql_affected_rows(mysql) == 0)
 
727
                                                        do_delete = false; // Another thread must have deleted the row first.
 
728
                                                else
 
729
                                                        fprintf(myLog, "DELETE %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id); 
 
730
                                        }
 
731
                                        
 
732
                                        mysql_free_result(results);
 
733
                                } 
 
734
                                
 
735
                                if (!do_delete) {
 
736
                                        blob_id = self->myTID; // Assign the tid as the blob id to help with debugging.
 
737
                                        txn_type = MS_ReferenceTxn;
 
738
                                        
 
739
                                        sprintf(stmt, "INSERT INTO "REF_TABLE" VALUES( NULL, %d, %"PRIu64", 0)", tab_id, tab_id, blob_id); 
 
740
                                        if (mysql_query(mysql, stmt)) 
 
741
                                                report_mysql_error(mysql, __LINE__, stmt);
 
742
                                                
 
743
                                        blob_ref_id = mysql_insert_id(mysql);
 
744
                                        if (!blob_ref_id)
 
745
                                                report_mysql_error(mysql, __LINE__, "mysql_insert_id() returned 0");
 
746
                                        
 
747
                                        fprintf(myLog, "INSERT %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id);   
 
748
                                        // Apply the blob reference now. This will be undone if the transaction is rolled back.
 
749
                                        sprintf(stmt, "INSERT INTO "LOG_TABLE" VALUES(%"PRIu64", %d, %"PRIu64", 0)", blob_ref_id, tab_id, blob_id); 
 
750
                                        if (mysql_query(mysql, stmt)) 
 
751
                                                report_mysql_error(mysql, __LINE__, stmt);
 
752
                                }
 
753
 
 
754
                                i++;
 
755
                                count++;
 
756
                                if (i >= tsize) { //Commit the database transaction before the log transaction.
 
757
                                        bool rollback;
 
758
                                        
 
759
                                        rollback = ((tsize > 0) && ((rand() % 1000) == 0));
 
760
                                        if (rollback) {
 
761
                                                printf("Rollback\n");
 
762
                                                if (mysql_rollback(mysql)) // commit the staement to the database,
 
763
                                                        report_mysql_error(mysql, __LINE__, "mysql_rollback()");        
 
764
                                                fprintf(myLog, "Rollback %"PRIu32"\n", self->myTID);    
 
765
                                                log->txn_LogTransaction(MS_RollBackTxn);
 
766
                                        } else {
 
767
                                                if (mysql_commit(mysql)) // commit the staement to the database,
 
768
                                                        report_mysql_error(mysql, __LINE__, "mysql_commit()");  
 
769
                                                fprintf(myLog, "Commit %"PRIu32"\n", self->myTID);      
 
770
                                                log->txn_LogTransaction(txn_type, true, A_DB_ID, tab_id, blob_id, blob_ref_id);
 
771
                                        }
 
772
                                } else
 
773
                                        log->txn_LogTransaction(txn_type, false, A_DB_ID, tab_id, blob_id, blob_ref_id);
 
774
                                                                
 
775
                        } while ( i < tsize);
 
776
                                                
 
777
                }
 
778
        }
 
779
        
 
780
        catch_(a) {
 
781
                self->logException();
 
782
                printf("\n\nA writer thread for table %d died! \n\n", tab_id);
 
783
                if (i == tsize) {
 
784
                        printf(" It is possible that the last %d operations on table %d were committed to the database but not to the log.\n", tsize, tab_id);
 
785
                }
 
786
                if (!myMustQuit && !stopit)
 
787
                        exit(1);
 
788
        }
 
789
        cont_(a);
 
790
        printf("Writer thread for table %d is shutting down.\n", tab_id);
 
791
        exit_();
 
792
}
 
793
 
 
794
// SELECT * FROM TransTest.translog where  blob_ref not in (select blob_ref from TransTest.transref)
 
795
// SELECT * FROM TransTest.transref_1 where  blob_ref not in (select blob_ref from TransTest.translog where tab_id = 1)
 
796
// SELECT * FROM TransTest.translog where  tab_id = 1 AND blob_ref not in (select blob_ref from TransTest.transref_1)
 
797
// select count(*) from TransTest.translog where committed = 1
 
798
//---------------------------------     
 
799
int main (int argc, const char * argv[]) 
 
800
{
 
801
        MYSQL *mysql;
 
802
        TransTestWriterThread **writer = NULL;
 
803
        int rtc = 1;
 
804
        
 
805
        process_args(argc, argv);
 
806
        
 
807
        mysql = new_connection(true);
 
808
        
 
809
        if (recreate)
 
810
                init_database(mysql, num_threads);
 
811
                
 
812
        init_env();
 
813
        enter_();
 
814
        
 
815
        if (dump_log) {
 
816
                printf("LOG dumped\n");
 
817
                exit(1);
 
818
        }
 
819
        
 
820
        TransReader = TransTestReaderThread::newTransTestReaderThread(trans_log);
 
821
        push_(TransReader);
 
822
        TransReader->recovering = true;
 
823
        TransReader->start();
 
824
        
 
825
        // wait until the recovery is complete.
 
826
        while (trans_log->txn_GetNumRecords())
 
827
                usleep(100);
 
828
                
 
829
        TransReader->recovering = false;
 
830
        
 
831
        if (log_size)
 
832
                trans_log->txn_SetLogSize(log_size);
 
833
                
 
834
        if (cache_size)
 
835
                trans_log->txn_SetCacheSize(cache_size);
 
836
                
 
837
        if (revover_only) {
 
838
                TransReader->stopit = true;
 
839
                if (verify_database(mysql))
 
840
                        rtc = 0;
 
841
                goto done;
 
842
        }
 
843
        
 
844
        try_(a) {
 
845
                writer = (TransTestWriterThread **) cs_malloc(num_threads * sizeof(TransTestWriterThread *));
 
846
                for (int i = 0; i < num_threads; i++) {
 
847
                        TransTestWriterThread *wt = TransTestWriterThread::newTransTestWriterThread(i+1);
 
848
                        wt->start();
 
849
                        writer[i] = wt;
 
850
                }
 
851
        
 
852
                printf("Timeout: %d seconds\n", timeout); 
 
853
                timeout += time(NULL);
 
854
                int header = 0;
 
855
                while (timeout > time(NULL)) {
 
856
                        MSTransStatsRec stats;
 
857
                        self->sleep(1000);
 
858
                        trans_log->txn_GetStats(&stats);
 
859
                        
 
860
                        
 
861
                        if (!(header%20)) {
 
862
                                for (int i = 0; i < num_threads; i++) {                         
 
863
                                        if (writer[i]->myActivity == 0) {
 
864
                                                printf("Writer thread %d HUNG!!!\n", i);
 
865
                                        }
 
866
                                        writer[i]->myActivity = 0;
 
867
                                }
 
868
                                
 
869
                                if (TransReader->myActivity == 0) {
 
870
                                        printf("Reader thread HUNG!!!\n");
 
871
                                }
 
872
                                TransReader->myActivity = 0;
 
873
                                        
 
874
                                printf("%s | %s | %s | %s | %s | %s | %s | %s\n", "LogSize", "Full", "MaxSize", "Overflows", "Overflowing", "CacheSize", "Cache Used", "Cache Hit");
 
875
                        }
 
876
                        header++;
 
877
                        //printf("Writes: %d \t\t Reads: %d \t%d \t start: %"PRIu64"\t\t eol:%"PRIu64"\n", count, TransReader->count, count - TransReader->count, trans_log->txn_Start, trans_log->txn_EOL);
 
878
                        printf("%7llu | %3d%% | %7llu | %9d | %11s | %9d | %9d%% | %9d%%\n",// | \t\t\t%"PRIu64" \t%"PRIu64"\n", 
 
879
                                stats.ts_LogSize,
 
880
                                stats.ts_PercentFull,
 
881
                                stats.ts_MaxSize,
 
882
                                stats.ts_OverflowCount,
 
883
                                (stats.ts_IsOverflowing)?"Over Flow": "   ---   ",
 
884
                                stats.ts_TransCacheSize,
 
885
                                stats.ts_PercentTransCacheUsed,
 
886
                                stats.ts_PercentCacheHit//, trans_log->txn_Start, trans_log->txn_EOL
 
887
                                );
 
888
                                
 
889
                                if (stats.ts_IsOverflowing && overflow_crash) {
 
890
                                        printf("Simulating crash while in overflow\n");
 
891
                                        exit(1);
 
892
                                }
 
893
                }
 
894
 
 
895
#ifdef CRASH_TEST               
 
896
                if (crash_site) {
 
897
                        printf("Crashing at crash site %d\n", crash_site);
 
898
                        trans_test_crash_point = crash_site;
 
899
                        // set the crash site and wait to die.
 
900
                        while(1)
 
901
                                self->sleep(1000);
 
902
                }
 
903
#endif
 
904
                
 
905
                printf("Shutting down the writer threads:\n");
 
906
                for (int i = 0; i < num_threads; i++) {
 
907
                        writer[i]->stopit = true;
 
908
                }
 
909
                
 
910
                TransReader->stopit = true;
 
911
                // Give the writers a chance to shutdown by themselves.
 
912
                int cnt = 100;
 
913
                while (cnt) {
 
914
                        int i;
 
915
                        for (i = 0; i < num_threads && writer[i]->finished; i++);
 
916
                        if (i == num_threads && TransReader->finished)
 
917
                                break;
 
918
                        self->sleep(10);        
 
919
                        cnt--;                  
 
920
                }
 
921
                
 
922
                for (int i = 0; i < num_threads; i++) {
 
923
                        writer[i]->stop();
 
924
                }
 
925
                
 
926
        }
 
927
        rtc = 0;
 
928
        catch_(a) {
 
929
                printf("Main thread abort.\n");
 
930
                self->logException();
 
931
        }
 
932
        cont_(a);
 
933
        if (writer) {
 
934
                for (int i = 0; i < num_threads; i++) {
 
935
                        writer[i]->stop();
 
936
                        writer[i]->release();
 
937
                }
 
938
                cs_free(writer);
 
939
        }
 
940
                
 
941
done:
 
942
        TransReader->stop();
 
943
        release_(TransReader);
 
944
        
 
945
        outer_();
 
946
        
 
947
        thread_list->stopAllThreads();
 
948
        deinit_env();
 
949
        mysql_close(mysql);
 
950
        exit(rtc);
 
951
}
 
952
 
 
953
#endif // UNIT_TEST