2
Just a test application for threads.
7
#include <mysql_version.h>
10
#include <sys/types.h>
12
#include <sys/types.h>
25
#define snprintf _snprintf
30
#define DEFAULT_INITIAL_LOAD 10000
31
#define DEFAULT_EXECUTE_SECONDS 120
32
#define DEFAULT_CONCURRENCY 8
33
#define TEST_FILENAME "concurrency_test.az"
35
#define HUGE_STRING_LENGTH 8192
37
/* Global Thread counter */
38
unsigned int thread_counter;
39
pthread_mutex_t counter_mutex;
40
pthread_cond_t count_threshhold;
41
unsigned int master_wakeup;
42
pthread_mutex_t sleeper_mutex;
43
pthread_cond_t sleep_threshhold;
44
static my_bool timer_alarm= FALSE;
45
pthread_mutex_t timer_alarm_mutex;
46
pthread_cond_t timer_alarm_threshold;
48
pthread_mutex_t row_lock;
51
void *run_task(void *p);
52
void *timer_thread(void *p);
53
void scheduler(az_method use_aio);
54
void create_data_file(azio_stream *write_handler, unsigned long long rows);
55
unsigned int write_row(azio_stream *s);
57
typedef struct thread_context_st thread_context_st;
58
struct thread_context_st {
59
unsigned int how_often_to_write;
60
unsigned long long counter;
65
/* Use this for string generation */
66
static const char ALPHANUMERICS[]=
67
"0123456789ABCDEFGHIJKLMNOPQRSTWXYZabcdefghijklmnopqrstuvwxyz";
69
#define ALPHANUMERICS_SIZE (sizeof(ALPHANUMERICS)-1)
71
static void get_random_string(char *buffer, size_t size)
73
char *buffer_ptr= buffer;
76
*buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
77
*buffer_ptr++= ALPHANUMERICS[random() % ALPHANUMERICS_SIZE];
80
int main(int argc, char *argv[])
91
if (!(mysql_thread_safe()))
92
fprintf(stderr, "This application was compiled incorrectly. Please recompile with thread support.\n");
96
pthread_mutex_init(&counter_mutex, NULL);
97
pthread_cond_init(&count_threshhold, NULL);
98
pthread_mutex_init(&sleeper_mutex, NULL);
99
pthread_cond_init(&sleep_threshhold, NULL);
100
VOID(pthread_mutex_init(&timer_alarm_mutex, NULL));
101
VOID(pthread_cond_init(&timer_alarm_threshold, NULL));
102
VOID(pthread_mutex_init(&row_lock, NULL));
104
for (method= AZ_METHOD_BLOCK; method < AZ_METHOD_MAX; method++)
107
(void)pthread_mutex_destroy(&counter_mutex);
108
(void)pthread_cond_destroy(&count_threshhold);
109
(void)pthread_mutex_destroy(&sleeper_mutex);
110
(void)pthread_cond_destroy(&sleep_threshhold);
111
VOID(pthread_mutex_destroy(&timer_alarm_mutex));
112
VOID(pthread_cond_destroy(&timer_alarm_threshold));
113
VOID(pthread_mutex_destroy(&row_lock));
118
void scheduler(az_method use_aio)
121
unsigned long long total;
122
azio_stream writer_handle;
123
thread_context_st *context;
124
pthread_t mainthread; /* Thread descriptor */
125
pthread_attr_t attr; /* Thread attributes */
127
pthread_attr_init(&attr);
128
pthread_attr_setdetachstate(&attr,
129
PTHREAD_CREATE_DETACHED);
131
pthread_mutex_lock(&counter_mutex);
134
create_data_file(&writer_handle, DEFAULT_INITIAL_LOAD);
136
pthread_mutex_lock(&sleeper_mutex);
138
pthread_mutex_unlock(&sleeper_mutex);
140
context= (thread_context_st *)malloc(sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
141
bzero(context, sizeof(thread_context_st) * DEFAULT_CONCURRENCY);
145
fprintf(stderr, "Could not allocate memory for context\n");
149
for (x= 0; x < DEFAULT_CONCURRENCY; x++)
152
context[x].how_often_to_write= random()%1000;
153
context[x].writer= &writer_handle;
154
context[x].counter= 0;
155
context[x].use_aio= use_aio;
157
/* now you create the thread */
158
if (pthread_create(&mainthread, &attr, run_task,
159
(void *)context) != 0)
161
fprintf(stderr,"Could not create thread\n");
167
if (DEFAULT_EXECUTE_SECONDS)
169
time_t opt_timer_length= DEFAULT_EXECUTE_SECONDS;
170
pthread_mutex_lock(&timer_alarm_mutex);
172
pthread_mutex_unlock(&timer_alarm_mutex);
174
if (pthread_create(&mainthread, &attr, timer_thread,
175
(void *)&opt_timer_length) != 0)
177
fprintf(stderr,"%s: Could not create timer thread\n", my_progname);
182
pthread_mutex_unlock(&counter_mutex);
183
pthread_attr_destroy(&attr);
185
pthread_mutex_lock(&sleeper_mutex);
187
pthread_mutex_unlock(&sleeper_mutex);
188
pthread_cond_broadcast(&sleep_threshhold);
191
We loop until we know that all children have cleaned up.
193
pthread_mutex_lock(&counter_mutex);
194
while (thread_counter)
196
struct timespec abstime;
198
bzero(&abstime, sizeof(struct timespec));
201
pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
203
pthread_mutex_unlock(&counter_mutex);
205
for (total= x= 0; x < DEFAULT_CONCURRENCY; x++)
206
total+= context[x].counter;
209
azclose(&writer_handle);
211
printf("Read %llu rows\n", total);
214
void *timer_thread(void *p)
216
time_t *timer_length= (time_t *)p;
217
struct timespec abstime;
219
if (mysql_thread_init())
221
fprintf(stderr,"%s: mysql_thread_init() failed.\n",
227
We lock around the initial call in case were we in a loop. This
228
also keeps the value properly syncronized across call threads.
230
pthread_mutex_lock(&sleeper_mutex);
231
while (master_wakeup)
233
pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
235
pthread_mutex_unlock(&sleeper_mutex);
237
set_timespec(abstime, *timer_length);
239
pthread_mutex_lock(&timer_alarm_mutex);
240
pthread_cond_timedwait(&timer_alarm_threshold, &timer_alarm_mutex, &abstime);
241
pthread_mutex_unlock(&timer_alarm_mutex);
243
pthread_mutex_lock(&timer_alarm_mutex);
245
pthread_mutex_unlock(&timer_alarm_mutex);
252
void *run_task(void *p)
254
thread_context_st *context= (thread_context_st *)p;
255
unsigned long long count;
258
azio_stream reader_handle;
260
if (mysql_thread_init())
262
fprintf(stderr,"%s: mysql_thread_init() failed.\n", my_progname);
266
if (!(ret= azopen(&reader_handle, TEST_FILENAME, O_RDONLY|O_BINARY,
269
printf("Could not open test file\n");
273
pthread_mutex_lock(&sleeper_mutex);
274
while (master_wakeup)
276
pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
278
pthread_mutex_unlock(&sleeper_mutex);
284
azread_init(&reader_handle);
285
while ((ret= azread_row(&reader_handle, &error)))
288
if (count % context->how_often_to_write)
290
write_row(context->writer);
293
/* If the timer is set, and the alarm is not active then end */
294
if (timer_alarm == FALSE)
298
pthread_mutex_lock(&counter_mutex);
300
pthread_cond_signal(&count_threshhold);
301
pthread_mutex_unlock(&counter_mutex);
302
azclose(&reader_handle);
309
void create_data_file(azio_stream *write_handler, unsigned long long rows)
312
unsigned long long x;
314
if (!(ret= azopen(write_handler, TEST_FILENAME, O_CREAT|O_RDWR|O_TRUNC|O_BINARY,
317
printf("Could not create test file\n");
321
for (x= 0; x < rows; x++)
322
write_row(write_handler);
324
azflush(write_handler, Z_SYNC_FLUSH);
327
unsigned int write_row(azio_stream *s)
330
char buffer[HUGE_STRING_LENGTH];
332
length= random() % HUGE_STRING_LENGTH;
334
/* Avoid zero length strings */
337
get_random_string(buffer, length);
338
pthread_mutex_lock(&row_lock);
339
azwrite_row(s, buffer, length);
340
pthread_mutex_unlock(&row_lock);