1
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25
* PBMS transaction handling test driver.
27
* This is a test driver for the PBMS transaction log. It uses 2 tables in a database and
28
* inserts transaction records into 1 while writing them to the transaction log. The transaction
29
* log reader thread reads the transactions from the log and writes them to the second table.
30
* After a recovery the 2 tables should be identical.
32
* Built in crash points can be triggered to test that the recovery works correctly.
45
#include "cslib/CSConfig.h"
46
#include "cslib/CSGlobal.h"
47
#include "cslib/CSThread.h"
48
#include "cslib/CSStrUtil.h"
49
#include "cslib/CSStorage.h"
51
#include "trans_cache_ms.h"
52
#include "trans_log_ms.h"
56
#define CREATE_TABLE_BODY "\
58
blob_ref INT NOT NULL AUTO_INCREMENT,\
60
blob_id BIGINT NOT NULL, \
61
committed BOOLEAN NOT NULL DEFAULT 0, \
62
PRIMARY KEY (blob_ref, tab_id)\
70
#define LOG_TABLE "translog"
71
#define REF_TABLE "transref_%d"
72
#define MAX_THREADS 20
76
#define TEST_DATABASE_NAME "TransTest"
77
static const char *user_name = "root";
78
static const char *user_passwd = "";
79
static int port = 3306;
80
static const char *host = "localhost";
81
static int nap_time = 1000;
82
static int max_transaction = 10; // The maximum number of records generated per transaction
83
static bool dump_log = false, overflow_crash = false;
84
static int crash_site = 0; // The location to crash at.
85
static int num_threads = 1; // The number of writer threads.
86
//static int rate = 1000; // The maximum transactions per second to allow.
87
static time_t timeout = 60; // How long to run for before crashing or shutting down.
88
static bool revover_only = false;
89
static bool recreate = false;
91
static uint32_t cache_size = 0, log_size = 0;
93
static MSTrans *trans_log;
95
static CSThreadList *thread_list;
97
static MYSQL *new_connection(bool check_for_db);
99
static CSThread *main_thread;
101
//------------------------------------------------
102
class TransTestThread : public CSDaemon {
105
CSDaemon(thread_list),
131
virtual bool doWork() {return true;}
134
//------------------------------------------------
135
class TransTestWriterThread : public TransTestThread {
137
TransTestWriterThread():TransTestThread() {}
142
void generate_records();
150
static TransTestWriterThread *newTransTestWriterThread(uint32_t id)
152
TransTestWriterThread *tt;
156
new_(tt, TransTestWriterThread());
159
sprintf(name, "write_%d.log", id);
161
tt->myLog = fopen(name, "w+");
163
tt->myLog = fopen(name, "a+");
164
fprintf(tt->myLog, "====================================================\n");
168
tt->mysql = new_connection(false);
178
//------------------------------------------------
179
class TransTestReaderThread : public TransTestThread {
181
TransTestReaderThread():TransTestThread(){}
184
void processTransactionLog();
187
processTransactionLog();
191
static TransTestReaderThread *newTransTestReaderThread(MSTrans *log)
193
TransTestReaderThread *tt;
196
new_(tt, TransTestReaderThread());
197
tt->mysql = new_connection(false);
201
tt->log->txn_SetReader(tt); // The reader daemon is passed in unreferenced.
202
tt->recovering = false;
206
bool rec_found(uint64_t id, uint32_t tab_id)
209
MYSQL_RES *results = NULL;
212
sprintf(stmt, "SELECT blob_ref FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %"PRIu32"", id, tab_id);
213
if (mysql_query(mysql, stmt)) {
214
printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
215
printf("%s\n", stmt);
220
results = mysql_store_result(mysql);
222
printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
226
found = (mysql_num_rows(results) == 1);
227
mysql_free_result(results);
236
TransTestReaderThread *TransReader;
237
//------------------------------------------------
238
static void report_mysql_error(MYSQL *mysql, int line, const char *msg)
240
printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), line);
247
//------------------------------------------------
248
static MYSQL *new_connection(bool check_for_db)
252
mysql = mysql_init(NULL);
254
printf( "mysql_init() failed.\n");
258
if (mysql_real_connect(mysql, host, user_name, user_passwd, NULL, port, NULL, 0) == NULL)
259
report_mysql_error(mysql, __LINE__, "mysql_real_connect()");
262
MYSQL_RES *results = NULL;
264
if (mysql_query(mysql, "show databases like \"" TEST_DATABASE_NAME "\""))
265
report_mysql_error(mysql, __LINE__, "show databases like \"" TEST_DATABASE_NAME "\"");
267
results = mysql_store_result(mysql);
269
report_mysql_error(mysql, __LINE__, "mysql_store_result()");
272
if (mysql_num_rows(results) != 1) {
273
if (mysql_query(mysql, "create database " TEST_DATABASE_NAME ))
274
report_mysql_error(mysql, __LINE__, "create database " TEST_DATABASE_NAME );
276
mysql_free_result(results);
279
if (mysql_query(mysql, "use " TEST_DATABASE_NAME ))
280
report_mysql_error(mysql, __LINE__, "use " TEST_DATABASE_NAME );
285
//------------------------------------------------
286
static void init_database(MYSQL *mysql, int cnt)
290
unlink("ms-trans-log.dat");
291
mysql_query(mysql, "drop table if exists " LOG_TABLE ";");
293
if (mysql_query(mysql, "create table " LOG_TABLE CREATE_TABLE_BODY ";")){
294
printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
299
sprintf(stmt, "drop table if exists " REF_TABLE ";", cnt);
300
mysql_query(mysql, stmt);
301
sprintf(stmt, "create table " REF_TABLE CREATE_TABLE_BODY ";", cnt);
302
if (mysql_query(mysql, stmt)){
303
printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
311
//------------------------------------------------
312
static void display_help(const char *app)
314
printf("\nUsage:\n");
315
printf("%s -help | -r [-t<num_threads>] | -d | [-n] [-sc <cache_size>] [-sl <log_size>] [-c <crash_site>] [-t<num_threads>] [<timeout>]\n\n", app);
317
printf("-r: Test recovery after a crash or shutdown.\n");
318
printf("-d: Dump the transaction log.\n");
319
printf("-n: Recreate the tables and recovery log.\n");
320
printf("-c <crash_site>: Crash at this location rather than shutting down. Max = %d\n", MAX_CRASH_POINT+1);
321
printf("-t<num_threads>: The number of writer threads to use, default is %d.\n", num_threads);
322
//printf("-r<rate>: The number af records to be inserted per second, default is %d.\n", rate);
323
printf("<timeout>: The number seconds the test should run before shuttingdown or crashing, default is %d.\n\n", timeout);
327
//---------------------------------
328
static void process_args(int argc, const char * argv[])
333
for (int i = 1; i < argc; ) {
334
if ( argv[i][0] != '-') { // Must be timeout
335
timeout = atoi(argv[i]);
337
if ((i != argc) || !timeout)
338
display_help(argv[0]);
340
switch (argv[i][1]) {
342
display_help(argv[0]);
346
if (argc > 4 || argv[i][2])
347
display_help(argv[0]);
353
if (argc != 2 || argv[i][2])
354
display_help(argv[0]);
361
display_help(argv[0]);
368
display_help(argv[0]);
370
crash_site = atoi(argv[i]);
371
if (crash_site == (MAX_CRASH_POINT + 1))
372
overflow_crash = true;
373
else if ((!crash_site) || (crash_site > MAX_CRASH_POINT))
374
display_help(argv[0]);
381
size = atol(argv[i+1]);
383
display_help(argv[0]);
385
if (argv[i][2] == 'c')
387
else if (argv[i][2] == 'l')
390
display_help(argv[0]);
398
display_help(argv[0]);
400
num_threads = atoi(argv[i]);
402
display_help(argv[0]);
408
rate = atoi(argv[i]);
410
display_help(argv[0]);
415
display_help(argv[0]);
422
//---------------------------------
423
static void init_env()
427
if (!(main_thread = CSThread::newCSThread())) {
428
CSException::logOSError(CS_CONTEXT, ENOMEM);
432
CSThread::setSelf(main_thread);
436
trans_log = MSTrans::txn_NewMSTrans("./ms-trans-log.dat", /*dump_log*/ true);
437
new_(thread_list, CSThreadList());
440
self->logException();
441
CSThread::shutDown();
447
//---------------------------------
448
static void deinit_env()
451
thread_list->release();
456
trans_log->release();
461
main_thread->release();
465
CSThread::shutDown();
468
//---------------------------------
469
static bool verify_database(MYSQL *mysql)
471
MYSQL_RES **r_results, *l_results = NULL;
472
MYSQL_ROW r_record, l_record;
474
int i, log_row_cnt, ref_row_cnt = 0, tab_id;
477
r_results = (MYSQL_RES **) malloc(num_threads * sizeof(MYSQL_RES *));
479
if (mysql_query(mysql, "select * from "LOG_TABLE" where committed = 0 order by blob_ref"))
480
report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
482
l_results = mysql_store_result(mysql);
484
report_mysql_error(mysql, __LINE__, "mysql_store_result()");
486
log_row_cnt = mysql_num_rows(l_results);
487
mysql_free_result(l_results);
489
printf("Uncommitted references: %d\n", log_row_cnt);
492
for (i =0; i < num_threads; i++) {
493
sprintf(stmt, "select * from "REF_TABLE" order by blob_ref", i+1);
494
if (mysql_query(mysql, stmt))
495
report_mysql_error(mysql, __LINE__, stmt);
497
r_results[i] = mysql_store_result(mysql);
499
report_mysql_error(mysql, __LINE__, "mysql_store_result()");
501
ref_row_cnt += mysql_num_rows(r_results[i]);
504
if (mysql_query(mysql, "select * from "LOG_TABLE" order by blob_ref"))
505
report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
507
l_results = mysql_store_result(mysql);
509
report_mysql_error(mysql, __LINE__, "mysql_store_result()");
511
log_row_cnt = mysql_num_rows(l_results);
513
if (log_row_cnt != ref_row_cnt) {
514
if (ref_row_cnt > log_row_cnt) {
515
printf("verify_database() Failed: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt, ref_row_cnt);
519
printf("verify_database() Warnning: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt, ref_row_cnt);
520
printf("Possible unreferenced BLOBs\n");
523
if (log_row_cnt == ref_row_cnt) {
524
for ( i = 0; i < log_row_cnt; i++) {
525
l_record = mysql_fetch_row(l_results);
526
tab_id = atol(l_record[1]);
527
r_record = mysql_fetch_row(r_results[tab_id-1]);
528
if ((atol(l_record[0]) != atol(r_record[0])) ||
529
(atol(l_record[1]) != atol(r_record[1])) ||
530
(atol(l_record[2]) != atol(r_record[2]))) {
532
printf("verify_database() Failed: in row %d, tab_id %d\n", i+1, tab_id);
533
printf("field 1: %d =? %d\n", atol(l_record[0]), atol(r_record[0]));
534
printf("field 2: %d =? %d\n", atol(l_record[1]), atol(r_record[1]));
535
printf("field 3: %d =? %d\n", atol(l_record[2]), atol(r_record[2]));
540
} else { // The important thing is that there are no BLOBs in the ref tabels that are not in the log table.
542
for (i =0; i < num_threads; i++) {
543
mysql_free_result(r_results[i]);
545
sprintf(stmt, "select * from "REF_TABLE" where blob_ref not in (select blob_ref from TransTest.translog where tab_id = %d)", i+1, i+1);
546
if (mysql_query(mysql, stmt))
547
report_mysql_error(mysql, __LINE__, stmt);
549
r_results[i] = mysql_store_result(mysql);
551
report_mysql_error(mysql, __LINE__, "mysql_store_result()");
553
if (mysql_num_rows(r_results[i])) {
554
printf("verify_database() Failed, Missing BLOBs: %s\n", stmt);
560
printf("verify_database() OK.\n");
565
for (i =0; i < num_threads; i++) {
566
mysql_free_result(r_results[i]);
570
mysql_free_result(l_results);
574
trans_log->txn_DumpLog("trace.log");
580
//------------------------------------------------
581
void TransTestReaderThread::processTransactionLog()
583
MSTransRec rec = {0};
586
uint32_t last_tid = 0;
589
// Read in transactions from the log and update
590
// the database table based on them.
593
while (!myMustQuit && !stopit) {
594
// This will sleep while waiting for the next
595
// completed transaction.
596
log->txn_GetNextTransaction(&rec, &state);
602
if (num_threads == 1) {
603
ASSERT( ((last_tid + 1) == rec.tr_id) || (last_tid == rec.tr_id) || !last_tid);
604
last_tid = rec.tr_id;
610
switch (TRANS_TYPE(rec.tr_type)) {
611
case MS_ReferenceTxn:
612
case MS_DereferenceTxn:
615
case MS_RecoveredTxn:
618
printf("Unexpected transaction type: %d\n", rec.tr_type);
622
if (state == MS_Committed){
623
// Dereferences are applied when the transaction is commited.
624
// References are applied imediatly and removed if the transaction is rolled back.
625
if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn) {
626
sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
627
if (mysql_query(mysql, stmt))
628
report_mysql_error(mysql, __LINE__, stmt);
629
} else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
630
sprintf(stmt, "UPDATE "LOG_TABLE" SET committed = 1 WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
631
if (mysql_query(mysql, stmt))
632
report_mysql_error(mysql, __LINE__, stmt);
634
} else if (state == MS_RolledBack) {
635
//printf("ROLLBACK!\n");
636
if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
637
sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
638
if (mysql_query(mysql, stmt))
639
report_mysql_error(mysql, __LINE__, stmt);
641
} else if (state == MS_Recovered) {
642
printf("Recovered transaction being ignored:\n");
643
printf("blob_ref = %"PRIu64", tab_id = %d, blob_id = %"PRIu64"\n\n", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
645
printf("Unexpected transaction state: %d\n", state);
653
self->logException();
654
printf("\n\n!!!!!!!! THE TRANSACTION LOG READER DIED! !!!!!!!!!!!\n\n");
655
if (!myMustQuit && !stopit)
659
printf("The transaction log reader shutting down.\n");
663
//------------------------------------------------
664
void TransTestWriterThread::generate_records()
669
uint64_t blob_ref_id;
677
while (!myMustQuit && !stopit) {
680
usleep(nap_time); // Give up a bit of time
681
if (myMustQuit || stopit)
684
tsize = rand() % max_transaction;
686
if (mysql_autocommit(mysql, 0))
687
report_mysql_error(mysql, __LINE__, "mysql_autocommit()");
691
do_delete = ((rand() %2) == 0);
693
// decide if this is an insert or delete
695
MYSQL_RES *results = NULL;
699
// If we are deleting then randomly select a record to delete
702
txn_type = MS_DereferenceTxn;
704
sprintf(stmt, "select * from "REF_TABLE, tab_id);
705
if (mysql_query(mysql, stmt))
706
report_mysql_error(mysql, __LINE__, stmt);
708
results = mysql_store_result(mysql);
710
report_mysql_error(mysql, __LINE__, "mysql_store_result()");
712
cnt = mysql_num_rows(results);
714
do_delete = false; // There is nothing to delete
716
mysql_data_seek(results, rand()%cnt);
717
record = mysql_fetch_row(results);
719
blob_ref_id = atol(record[0]);
720
blob_id = atol(record[2]);
722
sprintf(stmt, "DELETE FROM "REF_TABLE" WHERE blob_ref = %"PRIu64" AND blob_id = %"PRIu64"", tab_id, blob_ref_id, blob_id);
723
if (mysql_query(mysql, stmt))
724
report_mysql_error(mysql, __LINE__, stmt);
726
if (mysql_affected_rows(mysql) == 0)
727
do_delete = false; // Another thread must have deleted the row first.
729
fprintf(myLog, "DELETE %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id);
732
mysql_free_result(results);
736
blob_id = self->myTID; // Assign the tid as the blob id to help with debugging.
737
txn_type = MS_ReferenceTxn;
739
sprintf(stmt, "INSERT INTO "REF_TABLE" VALUES( NULL, %d, %"PRIu64", 0)", tab_id, tab_id, blob_id);
740
if (mysql_query(mysql, stmt))
741
report_mysql_error(mysql, __LINE__, stmt);
743
blob_ref_id = mysql_insert_id(mysql);
745
report_mysql_error(mysql, __LINE__, "mysql_insert_id() returned 0");
747
fprintf(myLog, "INSERT %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id);
748
// Apply the blob reference now. This will be undone if the transaction is rolled back.
749
sprintf(stmt, "INSERT INTO "LOG_TABLE" VALUES(%"PRIu64", %d, %"PRIu64", 0)", blob_ref_id, tab_id, blob_id);
750
if (mysql_query(mysql, stmt))
751
report_mysql_error(mysql, __LINE__, stmt);
756
if (i >= tsize) { //Commit the database transaction before the log transaction.
759
rollback = ((tsize > 0) && ((rand() % 1000) == 0));
761
printf("Rollback\n");
762
if (mysql_rollback(mysql)) // commit the staement to the database,
763
report_mysql_error(mysql, __LINE__, "mysql_rollback()");
764
fprintf(myLog, "Rollback %"PRIu32"\n", self->myTID);
765
log->txn_LogTransaction(MS_RollBackTxn);
767
if (mysql_commit(mysql)) // commit the staement to the database,
768
report_mysql_error(mysql, __LINE__, "mysql_commit()");
769
fprintf(myLog, "Commit %"PRIu32"\n", self->myTID);
770
log->txn_LogTransaction(txn_type, true, A_DB_ID, tab_id, blob_id, blob_ref_id);
773
log->txn_LogTransaction(txn_type, false, A_DB_ID, tab_id, blob_id, blob_ref_id);
775
} while ( i < tsize);
781
self->logException();
782
printf("\n\nA writer thread for table %d died! \n\n", tab_id);
784
printf(" It is possible that the last %d operations on table %d were committed to the database but not to the log.\n", tsize, tab_id);
786
if (!myMustQuit && !stopit)
790
printf("Writer thread for table %d is shutting down.\n", tab_id);
794
// SELECT * FROM TransTest.translog where blob_ref not in (select blob_ref from TransTest.transref)
795
// SELECT * FROM TransTest.transref_1 where blob_ref not in (select blob_ref from TransTest.translog where tab_id = 1)
796
// SELECT * FROM TransTest.translog where tab_id = 1 AND blob_ref not in (select blob_ref from TransTest.transref_1)
797
// select count(*) from TransTest.translog where committed = 1
798
//---------------------------------
799
int main (int argc, const char * argv[])
802
TransTestWriterThread **writer = NULL;
805
process_args(argc, argv);
807
mysql = new_connection(true);
810
init_database(mysql, num_threads);
816
printf("LOG dumped\n");
820
TransReader = TransTestReaderThread::newTransTestReaderThread(trans_log);
822
TransReader->recovering = true;
823
TransReader->start();
825
// wait until the recovery is complete.
826
while (trans_log->txn_GetNumRecords())
829
TransReader->recovering = false;
832
trans_log->txn_SetLogSize(log_size);
835
trans_log->txn_SetCacheSize(cache_size);
838
TransReader->stopit = true;
839
if (verify_database(mysql))
845
writer = (TransTestWriterThread **) cs_malloc(num_threads * sizeof(TransTestWriterThread *));
846
for (int i = 0; i < num_threads; i++) {
847
TransTestWriterThread *wt = TransTestWriterThread::newTransTestWriterThread(i+1);
852
printf("Timeout: %d seconds\n", timeout);
853
timeout += time(NULL);
855
while (timeout > time(NULL)) {
856
MSTransStatsRec stats;
858
trans_log->txn_GetStats(&stats);
862
for (int i = 0; i < num_threads; i++) {
863
if (writer[i]->myActivity == 0) {
864
printf("Writer thread %d HUNG!!!\n", i);
866
writer[i]->myActivity = 0;
869
if (TransReader->myActivity == 0) {
870
printf("Reader thread HUNG!!!\n");
872
TransReader->myActivity = 0;
874
printf("%s | %s | %s | %s | %s | %s | %s | %s\n", "LogSize", "Full", "MaxSize", "Overflows", "Overflowing", "CacheSize", "Cache Used", "Cache Hit");
877
//printf("Writes: %d \t\t Reads: %d \t%d \t start: %"PRIu64"\t\t eol:%"PRIu64"\n", count, TransReader->count, count - TransReader->count, trans_log->txn_Start, trans_log->txn_EOL);
878
printf("%7llu | %3d%% | %7llu | %9d | %11s | %9d | %9d%% | %9d%%\n",// | \t\t\t%"PRIu64" \t%"PRIu64"\n",
880
stats.ts_PercentFull,
882
stats.ts_OverflowCount,
883
(stats.ts_IsOverflowing)?"Over Flow": " --- ",
884
stats.ts_TransCacheSize,
885
stats.ts_PercentTransCacheUsed,
886
stats.ts_PercentCacheHit//, trans_log->txn_Start, trans_log->txn_EOL
889
if (stats.ts_IsOverflowing && overflow_crash) {
890
printf("Simulating crash while in overflow\n");
897
printf("Crashing at crash site %d\n", crash_site);
898
trans_test_crash_point = crash_site;
899
// set the crash site and wait to die.
905
printf("Shutting down the writer threads:\n");
906
for (int i = 0; i < num_threads; i++) {
907
writer[i]->stopit = true;
910
TransReader->stopit = true;
911
// Give the writers a chance to shutdown by themselves.
915
for (i = 0; i < num_threads && writer[i]->finished; i++);
916
if (i == num_threads && TransReader->finished)
922
for (int i = 0; i < num_threads; i++) {
929
printf("Main thread abort.\n");
930
self->logException();
934
for (int i = 0; i < num_threads; i++) {
936
writer[i]->release();
943
release_(TransReader);
947
thread_list->stopAllThreads();