~drizzle-trunk/drizzle/development

1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
2
 *
3
 * PrimeBase Media Stream for MySQL
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License
16
 * along with this program; if not, write to the Free Software
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
 *
19
 * Barry Leslie
20
 *
21
 * 2009-06-17
22
 *
23
 * H&G2JCtL
24
 *
25
 * PBMS transaction handling test driver.
26
 *
27
 * This is a test driver for the PBMS transaction log. It uses 2 tables in a database and
28
 * inserts transaction records into 1 while writing them to the transaction log. The transaction
29
 * log reader thread reads the transactions from the log and writes them to the second table.
30
 * After a recovery the 2 tables should be identical.
31
 *
32
 * Built in crash points can be triggered to test that the recovery works correctly.
33
 *
34
 */
35
 
36
#ifdef UNIT_TEST
37
38
#include <stdlib.h>
39
#include <stdio.h>
40
#include <unistd.h>
41
#include <string.h>
42
#include <ctype.h>
43
#include <inttypes.h>
44
45
#include "CSConfig.h"
46
#include "CSGlobal.h"
47
#include "CSThread.h"
48
#include "CSStrUtil.h"
49
#include "CSStorage.h"
50
51
#include "TransCache_ms.h"
52
#include "TransLog_ms.h"
53
54
#include "mysql.h"
55
56
#define CREATE_TABLE_BODY "\
57
 (\
58
  blob_ref INT NOT NULL AUTO_INCREMENT,\
59
  tab_id INT NOT NULL,\
60
  blob_id BIGINT NOT NULL, \
61
  committed BOOLEAN NOT NULL DEFAULT 0, \
62
  PRIMARY KEY (blob_ref, tab_id)\
63
)\
64
ENGINE = INNODB\
65
"
66
#ifdef LOG_TABLE
67
#undef LOG_TABLE
68
#endif
69
70
#define LOG_TABLE	"translog"
71
#define REF_TABLE	"transref_%d"
72
#define MAX_THREADS	20
73
74
#define A_DB_ID 123
75
76
#define TEST_DATABASE_NAME "TransTest"
77
static const char *user_name = "root";
78
static const char *user_passwd = "";
79
static int	port = 3306;
80
static const char *host = "localhost";
81
static int nap_time = 1000;
82
static int max_transaction = 10; // The maximum number of records generated per transaction
83
static bool dump_log = false, overflow_crash = false;
84
static int crash_site = 0;		// The location to crash at.
85
static int num_threads = 1;		// The number of writer threads.
86
//static int rate = 1000;			// The maximum transactions per second to allow.
87
static time_t timeout = 60;		// How long to run for before crashing or shutting down.
88
static bool revover_only = false;
89
static bool recreate = false;
90
91
static uint32_t cache_size = 0, log_size = 0;
92
93
static MSTrans *trans_log;
94
95
static CSThreadList *thread_list;
96
97
static MYSQL *new_connection(bool check_for_db);
98
99
static CSThread *main_thread;
100
101
//------------------------------------------------
102
class TransTestThread : public CSDaemon {
103
public:
104
	TransTestThread(): 
105
		CSDaemon(thread_list),
106
		count(0),
107
		myActivity(0),
108
		log(NULL),
109
		stopit(false),
110
		finished(false),
111
		mysql(NULL)
112
		{}
113
114
	 ~TransTestThread()
115
	{
116
		if (log)
117
			log->release();
118
			
119
		if (mysql)
120
			mysql_close(mysql);
121
	}
122
	 
123
	MSTrans *log;
124
	MYSQL *mysql;
125
	uint32_t count;
126
	uint32_t myActivity;
127
128
	bool stopit;
129
	bool finished;
130
	
131
	virtual bool doWork() {return true;}	
132
};
133
134
//------------------------------------------------
135
class TransTestWriterThread : public TransTestThread {
136
public:
137
	TransTestWriterThread():TransTestThread() {}
138
	
139
	uint32_t tab_id;
140
	FILE	*myLog;
141
	
142
	void generate_records();
143
	bool doWork() 
144
	{
145
		generate_records();
146
		finished = true;
147
		return true;
148
	}
149
	
150
	static TransTestWriterThread *newTransTestWriterThread(uint32_t id)
151
	{
152
		TransTestWriterThread *tt;
153
		enter_();
154
		
155
		
156
		new_(tt, TransTestWriterThread());
157
		
158
		char name[32];
159
		sprintf(name, "write_%d.log", id);
160
		if (recreate)
161
			tt->myLog = fopen(name, "w+");
162
		else {
163
			tt->myLog = fopen(name, "a+");
164
			fprintf(tt->myLog, "====================================================\n");
165
		}
166
		
167
		tt->tab_id = id ;
168
		tt->mysql = new_connection(false);
169
		tt->log = trans_log;
170
		trans_log->retain();
171
		
172
		return_(tt); 
173
	}
174
	
175
	
176
};
177
178
//------------------------------------------------
179
class TransTestReaderThread : public TransTestThread {
180
public:
181
	TransTestReaderThread():TransTestThread(){}
182
	
183
	bool recovering;
184
	void processTransactionLog();
185
	bool doWork() 
186
	{
187
		processTransactionLog();
188
		return true;
189
	}
190
	
191
	static TransTestReaderThread *newTransTestReaderThread(MSTrans *log)
192
	{
193
		TransTestReaderThread *tt;
194
		enter_();
195
		
196
		new_(tt, TransTestReaderThread());
197
		tt->mysql = new_connection(false);
198
		tt->log = log;
199
		tt->log->retain();
200
		
201
		tt->log->txn_SetReader(tt); // The reader daemon is passed in unreferenced.
202
		tt->recovering = false;
203
		return_(tt); 
204
	}
205
	
206
	bool rec_found(uint64_t id, uint32_t tab_id) 
207
	{
208
		char stmt[100];
209
		MYSQL_RES *results = NULL;            
210
		bool found;
211
		
212
		sprintf(stmt, "SELECT blob_ref FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %"PRIu32"", id, tab_id); 
213
		if (mysql_query(mysql, stmt)) {
214
			printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
215
			printf("%s\n", stmt);
216
			exit(1);
217
		}
218
		
219
		
220
		results = mysql_store_result(mysql);
221
		if (!results){
222
			printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
223
			exit(1);
224
		}
225
226
		found = (mysql_num_rows(results) == 1);		
227
		mysql_free_result(results);
228
		
229
		return found;
230
		
231
	}
232
	
233
	
234
};
235
236
TransTestReaderThread *TransReader;
237
//------------------------------------------------
238
static void report_mysql_error(MYSQL *mysql, int line, const char *msg)
239
{
240
	printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), line);
241
	if (msg)
242
		printf("%s\n", msg);
243
	exit(1);
244
}
245
246
247
//------------------------------------------------
248
static MYSQL *new_connection(bool check_for_db)
249
{
250
	MYSQL *mysql;
251
252
	mysql = mysql_init(NULL);
253
	if (!mysql) {
254
		printf( "mysql_init() failed.\n");
255
		exit(1);
256
	}
257
258
	if (mysql_real_connect(mysql, host, user_name, user_passwd, NULL, port, NULL, 0) == NULL)
259
		report_mysql_error(mysql, __LINE__, "mysql_real_connect()");
260
261
	if (check_for_db) {
262
		MYSQL_RES *results = NULL;            
263
		
264
		if (mysql_query(mysql, "show databases like \"" TEST_DATABASE_NAME "\""))
265
			report_mysql_error(mysql, __LINE__, "show databases like \"" TEST_DATABASE_NAME "\"");
266
		
267
		results = mysql_store_result(mysql);
268
		if (!results)
269
			report_mysql_error(mysql, __LINE__, "mysql_store_result()");
270
271
272
		if (mysql_num_rows(results) != 1) {
273
			if (mysql_query(mysql, "create database " TEST_DATABASE_NAME ))
274
				report_mysql_error(mysql, __LINE__, "create database " TEST_DATABASE_NAME );
275
		}
276
		mysql_free_result(results);
277
	}
278
	
279
	if (mysql_query(mysql, "use " TEST_DATABASE_NAME ))
280
		report_mysql_error(mysql, __LINE__, "use " TEST_DATABASE_NAME );
281
282
	return mysql;
283
}
284
285
//------------------------------------------------
286
static void init_database(MYSQL *mysql, int cnt)
287
{
288
	char stmt[1024];
289
	
290
	unlink("ms-trans-log.dat");
291
	mysql_query(mysql, "drop table if exists " LOG_TABLE  ";");
292
	
293
	if (mysql_query(mysql, "create table " LOG_TABLE  CREATE_TABLE_BODY ";")){
294
		printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
295
		exit(1);
296
	}
297
298
	while (cnt) {
299
		sprintf(stmt, "drop table if exists " REF_TABLE  ";", cnt);
300
		mysql_query(mysql, stmt);
301
		sprintf(stmt, "create table " REF_TABLE  CREATE_TABLE_BODY ";", cnt);
302
		if (mysql_query(mysql, stmt)){
303
			printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
304
			exit(1);
305
		}
306
		cnt--;
307
	}
308
}
309
310
311
//------------------------------------------------
312
static void display_help(const char *app)
313
{
314
	printf("\nUsage:\n");
315
	printf("%s -help | -r  [-t<num_threads>] | -d | [-n] [-sc <cache_size>] [-sl <log_size>] [-c <crash_site>]  [-t<num_threads>] [<timeout>]\n\n", app);
316
	
317
	printf("-r: Test recovery after a crash or shutdown.\n");
318
	printf("-d: Dump the transaction log.\n");
319
	printf("-n: Recreate the tables and recovery log.\n");
320
	printf("-c <crash_site>: Crash at this location rather than shutting down. Max = %d\n", MAX_CRASH_POINT+1);
321
	printf("-t<num_threads>: The number of writer threads to use, default is %d.\n", num_threads);
322
	//printf("-r<rate>: The number af records to be inserted per second, default is %d.\n", rate);
323
	printf("<timeout>: The number seconds the test should run before shuttingdown or crashing, default is %d.\n\n", timeout);
324
	exit(1);
325
}
326
327
//---------------------------------	
328
static void process_args(int argc, const char * argv[])
329
{
330
	if (argc < 2)
331
		return;
332
		
333
	for (int i = 1; i < argc; ) {
334
		if ( argv[i][0] != '-') { // Must be timeout
335
			timeout = atoi(argv[i]);
336
			i++;
337
			if ((i != argc) || !timeout)
338
				display_help(argv[0]);
339
		} else {
340
			switch (argv[i][1]) {
341
				case 'h':
342
					display_help(argv[0]);
343
					break;
344
					
345
				case 'r':
346
					if (argc > 4 || argv[i][2])
347
						display_help(argv[0]);
348
					revover_only = true;
349
					i++;
350
					break;
351
					
352
				case 'd':
353
					if (argc != 2 || argv[i][2])
354
						display_help(argv[0]);
355
					dump_log = true;
356
					i++;
357
					break;
358
					
359
				case 'n':
360
					if (argv[i][2])
361
						display_help(argv[0]);
362
					recreate = true;
363
					i++;
364
					break;
365
					
366
				case 'c':
367
					if (argv[i][2])
368
						display_help(argv[0]);
369
					i++;
370
					crash_site = atoi(argv[i]);
371
					if (crash_site == (MAX_CRASH_POINT + 1))
372
						overflow_crash = true;
373
					else if ((!crash_site) || (crash_site >  MAX_CRASH_POINT))
374
						display_help(argv[0]);
375
					i++;
376
					break;
377
					
378
				case 's': {
379
						uint32_t size;
380
						
381
						size = atol(argv[i+1]);
382
						if (!size)
383
							display_help(argv[0]);
384
							
385
						if (argv[i][2] == 'c') 
386
							cache_size = size;
387
						else if (argv[i][2] == 'l')
388
							log_size = size;
389
						else 
390
							display_help(argv[0]);
391
						
392
						i+=2;
393
					}
394
					break;
395
					
396
				case 't':
397
					if (argv[i][2])
398
						display_help(argv[0]);
399
					i++;
400
					num_threads = atoi(argv[i]);
401
					if (!num_threads)
402
						display_help(argv[0]);
403
					i++;
404
					break;
405
/*					
406
				case 'r':
407
					i++;
408
					rate = atoi(argv[i]);
409
					if (!rate)
410
						display_help(argv[0]);
411
					i++;
412
					break;
413
*/
414
				default:
415
					display_help(argv[0]);
416
			}
417
			
418
		}
419
	}
420
}
421
422
//---------------------------------	
423
static void init_env()
424
{
425
	cs_init_memory();
426
	CSThread::startUp();
427
	if (!(main_thread = CSThread::newCSThread())) {
428
		CSException::logOSError(CS_CONTEXT, ENOMEM);
429
		exit(1);
430
	}
431
	
432
	CSThread::setSelf(main_thread);
433
	
434
	enter_();
435
	try_(a) {
436
		trans_log = MSTrans::txn_NewMSTrans("./ms-trans-log.dat", /*dump_log*/ true);
437
		new_(thread_list, CSThreadList()); 
438
	}
439
	catch_(a) {
440
		self->logException();
441
		CSThread::shutDown();
442
		exit(1);
443
	}
444
	cont_(a);
445
	
446
}
447
//---------------------------------	
448
static void deinit_env()
449
{
450
	if (thread_list) {
451
		thread_list->release();
452
		thread_list = NULL;
453
	}
454
	
455
	if (trans_log) {
456
		trans_log->release();
457
		trans_log = NULL;
458
	}
459
	
460
	if (main_thread) {
461
		main_thread->release();
462
		main_thread = NULL;
463
	}
464
	
465
	CSThread::shutDown();
466
	cs_exit_memory();
467
}
468
//---------------------------------	
469
static bool verify_database(MYSQL *mysql)
470
{
471
	MYSQL_RES **r_results, *l_results = NULL;            
472
	MYSQL_ROW r_record, l_record;
473
	bool ok = false;
474
	int i, log_row_cnt, ref_row_cnt = 0, tab_id;
475
	char stmt[1024];
476
	
477
	r_results = (MYSQL_RES **) malloc(num_threads * sizeof(MYSQL_RES *));
478
	
479
	if (mysql_query(mysql, "select * from "LOG_TABLE" where committed = 0 order by blob_ref")) 
480
		report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
481
						
482
	l_results = mysql_store_result(mysql);
483
	if (!l_results)
484
		report_mysql_error(mysql, __LINE__, "mysql_store_result()");
485
486
	log_row_cnt = mysql_num_rows(l_results);
487
	mysql_free_result(l_results);
488
	if (log_row_cnt)
489
		printf("Uncommitted references: %d\n", log_row_cnt);
490
491
	//---------
492
	for (i =0; i < num_threads; i++) {
493
		sprintf(stmt, "select * from "REF_TABLE" order by blob_ref", i+1);
494
		if (mysql_query(mysql, stmt)) 
495
			report_mysql_error(mysql, __LINE__, stmt);
496
							
497
		r_results[i] = mysql_store_result(mysql);
498
		if (!r_results)
499
			report_mysql_error(mysql, __LINE__, "mysql_store_result()");
500
			
501
		ref_row_cnt += mysql_num_rows(r_results[i]);
502
	}	
503
	//---------
504
	if (mysql_query(mysql, "select * from "LOG_TABLE" order by blob_ref")) 
505
		report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
506
						
507
	l_results = mysql_store_result(mysql);
508
	if (!l_results)
509
		report_mysql_error(mysql, __LINE__, "mysql_store_result()");
510
511
	log_row_cnt = mysql_num_rows(l_results);
512
	
513
	if (log_row_cnt != ref_row_cnt) {
514
		if (ref_row_cnt > log_row_cnt) {
515
			printf("verify_database() Failed: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt,  ref_row_cnt);
516
			goto done;
517
		}
518
		
519
		printf("verify_database() Warnning: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt,  ref_row_cnt);		
520
		printf("Possible unreferenced BLOBs\n");
521
	}
522
	
523
	if (log_row_cnt == ref_row_cnt) {
524
		for ( i = 0; i < log_row_cnt; i++) {
525
			l_record = mysql_fetch_row(l_results);
526
			tab_id = atol(l_record[1]);
527
			r_record = mysql_fetch_row(r_results[tab_id-1]);		
528
			if ((atol(l_record[0]) != atol(r_record[0])) ||
529
				(atol(l_record[1]) != atol(r_record[1])) ||
530
				(atol(l_record[2]) != atol(r_record[2]))) {
531
				
532
				printf("verify_database() Failed: in row %d, tab_id %d\n", i+1, tab_id);
533
				printf("field 1:  %d =? %d\n", atol(l_record[0]), atol(r_record[0]));
534
				printf("field 2:  %d =? %d\n", atol(l_record[1]), atol(r_record[1]));
535
				printf("field 3:  %d =? %d\n", atol(l_record[2]), atol(r_record[2]));
536
				goto done;
537
			}
538
				
539
		}
540
	} else { // The important thing is that there are no BLOBs in the ref tabels that are not in the log table.
541
542
		for (i =0; i < num_threads; i++) {
543
			mysql_free_result(r_results[i]);
544
			
545
			sprintf(stmt, "select * from "REF_TABLE" where  blob_ref not in (select blob_ref from TransTest.translog where tab_id = %d)", i+1, i+1);
546
			if (mysql_query(mysql, stmt)) 
547
				report_mysql_error(mysql, __LINE__, stmt);
548
								
549
			r_results[i] = mysql_store_result(mysql);
550
			if (!r_results)
551
				report_mysql_error(mysql, __LINE__, "mysql_store_result()");
552
				
553
			if (mysql_num_rows(r_results[i])) {
554
				printf("verify_database() Failed, Missing BLOBs: %s\n", stmt);
555
				goto done;
556
			}
557
		}	
558
	}
559
	
560
	printf("verify_database() OK.\n");
561
	ok = true;
562
	
563
	done:
564
	
565
	for (i =0; i < num_threads; i++) {
566
		mysql_free_result(r_results[i]);
567
	}
568
	free(r_results);
569
	
570
	mysql_free_result(l_results);
571
	
572
#ifdef DEBUG	
573
	if (!ok) {
574
		trans_log->txn_DumpLog("trace.log");
575
	}
576
#endif	
577
	return ok;
578
}
579
580
//------------------------------------------------
581
void TransTestReaderThread::processTransactionLog()
582
{
583
	MSTransRec rec = {0};
584
	MS_TxnState state;
585
	char stmt[1024];
586
	uint32_t last_tid = 0;
587
	enter_();
588
	
589
	// Read in transactions from the log and update
590
	// the database table based on them.
591
	
592
	try_(a) {
593
		while (!myMustQuit && !stopit) {
594
			// This will sleep while waiting for the next 
595
			// completed transaction.
596
			log->txn_GetNextTransaction(&rec, &state); 
597
			if (myMustQuit)
598
				break;
599
600
			myActivity++;
601
#ifdef CHECK_TIDS
602
			if (num_threads == 1) {
603
				ASSERT( ((last_tid + 1) == rec.tr_id) || (last_tid  == rec.tr_id) || !last_tid);
604
				last_tid = rec.tr_id;
605
			}
606
#endif			
607
			if (!recovering) 
608
				count++;
609
			
610
			switch (TRANS_TYPE(rec.tr_type)) {
611
				case MS_ReferenceTxn:
612
				case MS_DereferenceTxn:
613
				case MS_RollBackTxn:
614
				case MS_CommitTxn:
615
				case MS_RecoveredTxn:
616
				break;
617
				default:
618
					printf("Unexpected transaction type: %d\n", rec.tr_type);
619
					exit(1);							
620
			}
621
			
622
			if (state == MS_Committed){
623
				// Dereferences are applied when the transaction is commited.
624
				// References are applied imediatly and removed if the transaction is rolled back.
625
				if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn) {
626
					sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
627
					if (mysql_query(mysql, stmt))  
628
						report_mysql_error(mysql, __LINE__, stmt);
629
				} else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
630
					sprintf(stmt, "UPDATE "LOG_TABLE" SET committed = 1 WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
631
					if (mysql_query(mysql, stmt))  
632
						report_mysql_error(mysql, __LINE__, stmt);
633
				}
634
			} else if (state == MS_RolledBack) { 
635
				//printf("ROLLBACK!\n");
636
				if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
637
					sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
638
					if (mysql_query(mysql, stmt))  
639
						report_mysql_error(mysql, __LINE__, stmt);
640
				}
641
			} else if (state == MS_Recovered) { 
642
				printf("Recovered transaction being ignored:\n");
643
				printf("blob_ref = %"PRIu64", tab_id = %d, blob_id = %"PRIu64"\n\n", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
644
			} else {
645
				printf("Unexpected transaction state: %d\n", state);
646
				exit(1);							
647
			}
648
			
649
			
650
		}
651
	}
652
	catch_(a) {
653
		self->logException();
654
		printf("\n\n!!!!!!!! THE TRANSACTION LOG READER DIED! !!!!!!!!!!!\n\n");
655
		if (!myMustQuit && !stopit)
656
			exit(1);
657
	}
658
	cont_(a);
659
	printf("The transaction log reader shutting down.\n");
660
	exit_();
661
}
662
663
//------------------------------------------------
664
void TransTestWriterThread::generate_records()
665
{
666
667
	MS_Txn	txn_type;		
668
	uint64_t	blob_id;	
669
	uint64_t	blob_ref_id;	
670
	int tsize, i;
671
	bool do_delete;
672
		
673
	char stmt[1024];
674
	enter_();
675
676
	try_(a) {
677
		while (!myMustQuit && !stopit) {
678
		
679
			myActivity++;
680
			usleep(nap_time); // Give up a bit of time
681
			if (myMustQuit || stopit)
682
				break;
683
				
684
			tsize = rand() % max_transaction;
685
			
686
			if (mysql_autocommit(mysql, 0))
687
				report_mysql_error(mysql, __LINE__, "mysql_autocommit()");
688
				
689
			i = 0;
690
			do {
691
				do_delete = ((rand() %2) == 0);
692
				
693
				// decide if this is an insert or delete
694
				if (do_delete) {
695
					MYSQL_RES *results = NULL;            
696
					MYSQL_ROW record;
697
					int cnt;
698
					
699
					// If we are deleting then randomly select a record to delete
700
					// and delete it. 
701
					
702
					txn_type = MS_DereferenceTxn;
703
704
					sprintf(stmt, "select * from "REF_TABLE, tab_id); 
705
					if (mysql_query(mysql, stmt)) 
706
						report_mysql_error(mysql, __LINE__, stmt);
707
						
708
					results = mysql_store_result(mysql);
709
					if (!results)
710
						report_mysql_error(mysql, __LINE__, "mysql_store_result()");
711
						
712
					cnt = mysql_num_rows(results);
713
					if (!cnt)
714
						do_delete = false; // There is nothing to delete
715
					else {
716
						mysql_data_seek(results, rand()%cnt);
717
						record = mysql_fetch_row(results);
718
							
719
						blob_ref_id = atol(record[0]);
720
						blob_id = atol(record[2]);
721
						
722
						sprintf(stmt, "DELETE FROM "REF_TABLE" WHERE blob_ref = %"PRIu64" AND blob_id = %"PRIu64"", tab_id, blob_ref_id, blob_id); 
723
						if (mysql_query(mysql, stmt))  
724
							report_mysql_error(mysql, __LINE__, stmt);
725
							
726
						if (mysql_affected_rows(mysql) == 0)
727
							do_delete = false; // Another thread must have deleted the row first.
728
						else
729
							fprintf(myLog, "DELETE %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id); 
730
					}
731
					
732
					mysql_free_result(results);
733
				} 
734
				
735
				if (!do_delete) {
736
					blob_id = self->myTID; // Assign the tid as the blob id to help with debugging.
737
					txn_type = MS_ReferenceTxn;
738
					
739
					sprintf(stmt, "INSERT INTO "REF_TABLE" VALUES( NULL, %d, %"PRIu64", 0)", tab_id, tab_id, blob_id); 
740
					if (mysql_query(mysql, stmt)) 
741
						report_mysql_error(mysql, __LINE__, stmt);
742
						
743
					blob_ref_id = mysql_insert_id(mysql);
744
					if (!blob_ref_id)
745
						report_mysql_error(mysql, __LINE__, "mysql_insert_id() returned 0");
746
					
747
					fprintf(myLog, "INSERT %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id);	
748
					// Apply the blob reference now. This will be undone if the transaction is rolled back.
749
					sprintf(stmt, "INSERT INTO "LOG_TABLE" VALUES(%"PRIu64", %d, %"PRIu64", 0)", blob_ref_id, tab_id, blob_id); 
750
					if (mysql_query(mysql, stmt)) 
751
						report_mysql_error(mysql, __LINE__, stmt);
752
				}
753
754
				i++;
755
				count++;
756
				if (i >= tsize) { //Commit the database transaction before the log transaction.
757
					bool rollback;
758
					
759
					rollback = ((tsize > 0) && ((rand() % 1000) == 0));
760
					if (rollback) {
761
						printf("Rollback\n");
762
						if (mysql_rollback(mysql)) // commit the staement to the database,
763
							report_mysql_error(mysql, __LINE__, "mysql_rollback()");	
764
						fprintf(myLog, "Rollback %"PRIu32"\n", self->myTID);	
765
						log->txn_LogTransaction(MS_RollBackTxn);
766
					} else {
767
						if (mysql_commit(mysql)) // commit the staement to the database,
768
							report_mysql_error(mysql, __LINE__, "mysql_commit()");	
769
						fprintf(myLog, "Commit %"PRIu32"\n", self->myTID);	
770
						log->txn_LogTransaction(txn_type, true, A_DB_ID, tab_id, blob_id, blob_ref_id);
771
					}
772
				} else
773
					log->txn_LogTransaction(txn_type, false, A_DB_ID, tab_id, blob_id, blob_ref_id);
774
								
775
			} while ( i < tsize);
776
						
777
		}
778
	}
779
	
780
	catch_(a) {
781
		self->logException();
782
		printf("\n\nA writer thread for table %d died! \n\n", tab_id);
783
		if (i == tsize) {
784
			printf(" It is possible that the last %d operations on table %d were committed to the database but not to the log.\n", tsize, tab_id);
785
		}
786
		if (!myMustQuit && !stopit)
787
			exit(1);
788
	}
789
	cont_(a);
790
	printf("Writer thread for table %d is shutting down.\n", tab_id);
791
	exit_();
792
}
793
794
// SELECT * FROM TransTest.translog where  blob_ref not in (select blob_ref from TransTest.transref)
795
// SELECT * FROM TransTest.transref_1 where  blob_ref not in (select blob_ref from TransTest.translog where tab_id = 1)
796
// SELECT * FROM TransTest.translog where  tab_id = 1 AND blob_ref not in (select blob_ref from TransTest.transref_1)
797
// select count(*) from TransTest.translog where committed = 1
798
//---------------------------------	
799
int main (int argc, const char * argv[]) 
800
{
801
	MYSQL *mysql;
802
	TransTestWriterThread **writer = NULL;
803
	int rtc = 1;
804
	
805
	process_args(argc, argv);
806
	
807
	mysql = new_connection(true);
808
	
809
	if (recreate)
810
		init_database(mysql, num_threads);
811
		
812
	init_env();
813
	enter_();
814
	
815
	if (dump_log) {
816
		printf("LOG dumped\n");
817
		exit(1);
818
	}
819
	
820
	TransReader = TransTestReaderThread::newTransTestReaderThread(trans_log);
821
	push_(TransReader);
822
	TransReader->recovering = true;
823
	TransReader->start();
824
	
825
	// wait until the recovery is complete.
826
	while (trans_log->txn_GetNumRecords())
827
		usleep(100);
828
		
829
	TransReader->recovering = false;
830
	
831
	if (log_size)
832
		trans_log->txn_SetLogSize(log_size);
833
		
834
	if (cache_size)
835
		trans_log->txn_SetCacheSize(cache_size);
836
		
837
	if (revover_only) {
838
		TransReader->stopit = true;
839
		if (verify_database(mysql))
840
			rtc = 0;
841
		goto done;
842
	}
843
	
844
	try_(a) {
845
		writer = (TransTestWriterThread **) cs_malloc(num_threads * sizeof(TransTestWriterThread *));
846
		for (int i = 0; i < num_threads; i++) {
847
			TransTestWriterThread *wt = TransTestWriterThread::newTransTestWriterThread(i+1);
848
			wt->start();
849
			writer[i] = wt;
850
		}
851
	
852
		printf("Timeout: %d seconds\n", timeout); 
853
		timeout += time(NULL);
854
		int header = 0;
855
		while (timeout > time(NULL)) {
856
			MSTransStatsRec stats;
857
			self->sleep(1000);
858
			trans_log->txn_GetStats(&stats);
859
			
860
			
861
			if (!(header%20)) {
862
				for (int i = 0; i < num_threads; i++) {				
863
					if (writer[i]->myActivity == 0) {
864
						printf("Writer thread %d HUNG!!!\n", i);
865
					}
866
					writer[i]->myActivity = 0;
867
				}
868
				
869
				if (TransReader->myActivity == 0) {
870
					printf("Reader thread HUNG!!!\n");
871
				}
872
				TransReader->myActivity = 0;
873
					
874
				printf("%s | %s | %s | %s | %s | %s | %s | %s\n", "LogSize", "Full", "MaxSize", "Overflows", "Overflowing", "CacheSize", "Cache Used", "Cache Hit");
875
			}
876
			header++;
877
			//printf("Writes: %d \t\t Reads: %d \t%d \t start: %"PRIu64"\t\t eol:%"PRIu64"\n", count, TransReader->count, count - TransReader->count, trans_log->txn_Start, trans_log->txn_EOL);
878
			printf("%7llu | %3d%% | %7llu | %9d | %11s | %9d | %9d%% | %9d%%\n",// | \t\t\t%"PRIu64" \t%"PRIu64"\n", 
879
				stats.ts_LogSize,
880
				stats.ts_PercentFull,
881
				stats.ts_MaxSize,
882
				stats.ts_OverflowCount,
883
				(stats.ts_IsOverflowing)?"Over Flow": "   ---   ",
884
				stats.ts_TransCacheSize,
885
				stats.ts_PercentTransCacheUsed,
886
				stats.ts_PercentCacheHit//, trans_log->txn_Start, trans_log->txn_EOL
887
				);
888
				
889
				if (stats.ts_IsOverflowing && overflow_crash) {
890
					printf("Simulating crash while in overflow\n");
891
					exit(1);
892
				}
893
		}
894
895
#ifdef CRASH_TEST		
896
		if (crash_site) {
897
			printf("Crashing at crash site %d\n", crash_site);
898
			trans_test_crash_point = crash_site;
899
			// set the crash site and wait to die.
900
			while(1)
901
				self->sleep(1000);
902
		}
903
#endif
904
		
905
		printf("Shutting down the writer threads:\n");
906
		for (int i = 0; i < num_threads; i++) {
907
			writer[i]->stopit = true;
908
		}
909
		
910
		TransReader->stopit = true;
911
		// Give the writers a chance to shutdown by themselves.
912
		int cnt = 100;
913
		while (cnt) {
914
			int i;
915
			for (i = 0; i < num_threads && writer[i]->finished; i++);
916
			if (i == num_threads && TransReader->finished)
917
				break;
918
			self->sleep(10);	
919
			cnt--;			
920
		}
921
		
922
		for (int i = 0; i < num_threads; i++) {
923
			writer[i]->stop();
924
		}
925
		
926
	}
927
	rtc = 0;
928
	catch_(a) {
929
		printf("Main thread abort.\n");
930
		self->logException();
931
	}
932
	cont_(a);
933
	if (writer) {
934
		for (int i = 0; i < num_threads; i++) {
935
			writer[i]->stop();
936
			writer[i]->release();
937
		}
938
		cs_free(writer);
939
	}
940
		
941
done:
942
	TransReader->stop();
943
	release_(TransReader);
944
	
945
	outer_();
946
	
947
	thread_list->stopAllThreads();
948
	deinit_env();
949
	mysql_close(mysql);
950
	exit(rtc);
951
}
952
953
#endif // UNIT_TEST